From 03901b9a7a1970162fa71fd3892905496ef892c3 Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Sat, 30 Dec 2023 12:59:46 +0800 Subject: [PATCH] [enhancement](group_commit): refector relay wal code (#29183) --- be/src/http/action/http_stream.cpp | 16 +- be/src/http/action/http_stream.h | 2 +- be/src/olap/wal_dirs_info.cpp | 1 + be/src/olap/wal_info.cpp | 41 ++ be/src/olap/wal_info.h | 38 ++ be/src/olap/wal_manager.cpp | 41 +- be/src/olap/wal_manager.h | 17 +- be/src/olap/wal_table.cpp | 428 ++++++++---------- be/src/olap/wal_table.h | 32 +- be/src/runtime/group_commit_mgr.cpp | 26 +- .../runtime/stream_load/stream_load_context.h | 1 + be/src/vec/exec/format/wal/wal_reader.cpp | 2 - be/src/vec/sink/writer/vwal_writer.cpp | 2 +- be/test/olap/wal_manager_test.cpp | 6 +- .../doris/analysis/NativeInsertStmt.java | 2 +- gensrc/thrift/FrontendService.thrift | 14 +- 16 files changed, 346 insertions(+), 323 deletions(-) create mode 100644 be/src/olap/wal_info.cpp create mode 100644 be/src/olap/wal_info.h diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index a18b10f491fc1a..15d2a1a18d1bed 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -256,7 +256,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { } else { LOG(INFO) << "use a portion of data to request fe to obtain column information"; ctx->is_read_schema = false; - ctx->status = _process_put(req, ctx); + ctx->status = process_put(req, ctx); } } @@ -272,7 +272,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute " << "here"; ctx->is_read_schema = false; - ctx->status = _process_put(req, ctx); + ctx->status = process_put(req, ctx); } ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time); } @@ -290,11 +290,15 @@ void HttpStreamAction::free_handler_ctx(std::shared_ptr param) { ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); } -Status HttpStreamAction::_process_put(HttpRequest* http_req, - std::shared_ptr ctx) { +Status HttpStreamAction::process_put(HttpRequest* http_req, + std::shared_ptr ctx) { TStreamLoadPutRequest request; set_request_auth(&request, ctx->auth); - request.__set_load_sql(http_req->header(HTTP_SQL)); + if (http_req != nullptr) { + request.__set_load_sql(http_req->header(HTTP_SQL)); + } else { + request.__set_load_sql(ctx->sql_str); + } request.__set_loadId(ctx->id.to_thrift()); request.__set_label(ctx->label); if (ctx->group_commit) { @@ -330,7 +334,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req, ctx->txn_id = ctx->put_result.params.txn_conf.txn_id; ctx->label = ctx->put_result.params.import_label; ctx->put_result.params.__set_wal_id(ctx->wal_id); - if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { + if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || diff --git a/be/src/http/action/http_stream.h b/be/src/http/action/http_stream.h index d4140a118d61e3..90a8b48fb2bb77 100644 --- a/be/src/http/action/http_stream.h +++ b/be/src/http/action/http_stream.h @@ -43,11 +43,11 @@ class HttpStreamAction : public HttpHandler { void on_chunk_data(HttpRequest* req) override; void free_handler_ctx(std::shared_ptr ctx) override; + Status process_put(HttpRequest* http_req, std::shared_ptr ctx); private: Status _on_header(HttpRequest* http_req, std::shared_ptr ctx); Status _handle(HttpRequest* req, std::shared_ptr ctx); - Status _process_put(HttpRequest* http_req, std::shared_ptr ctx); void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr ctx); diff --git a/be/src/olap/wal_dirs_info.cpp b/be/src/olap/wal_dirs_info.cpp index 340d896a8c60db..1b7216b0724598 100644 --- a/be/src/olap/wal_dirs_info.cpp +++ b/be/src/olap/wal_dirs_info.cpp @@ -169,6 +169,7 @@ size_t WalDirsInfo::get_max_available_size() { Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) { for (const auto& wal_dir_info : _wal_dirs_info_vec) { + LOG(INFO) << "wal_dir_info:" << wal_dir_info->get_wal_dir(); if (wal_dir_info->get_wal_dir() == wal_dir) { return wal_dir_info->update_wal_dir_limit(limit); } diff --git a/be/src/olap/wal_info.cpp b/be/src/olap/wal_info.cpp new file mode 100644 index 00000000000000..d93593cfaf026d --- /dev/null +++ b/be/src/olap/wal_info.cpp @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_info.h" +namespace doris { +WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms) + : _wal_id(wal_id), + _wal_path(wal_path), + _retry_num(retry_num), + _start_time_ms(start_time_ms) {} +WalInfo::~WalInfo() {} +int64_t WalInfo::get_wal_id() { + return _wal_id; +} +std::string WalInfo::get_wal_path() { + return _wal_path; +} +int64_t WalInfo::get_retry_num() { + return _retry_num; +} +int64_t WalInfo::get_start_time_ms() { + return _start_time_ms; +} +void WalInfo::add_retry_num() { + _retry_num++; +} +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_info.h b/be/src/olap/wal_info.h new file mode 100644 index 00000000000000..0383ac68f2ce36 --- /dev/null +++ b/be/src/olap/wal_info.h @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once +#include "runtime/exec_env.h" + +namespace doris { +class WalInfo { +public: + WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms); + ~WalInfo(); + int64_t get_wal_id(); + int64_t get_retry_num(); + int64_t get_start_time_ms(); + std::string get_wal_path(); + void add_retry_num(); + +private: + int64_t _wal_id; + std::string _wal_path; + int64_t _retry_num; + int64_t _start_time_ms; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index 0fe59cdf9cac3f..a9c35794698698 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -82,6 +82,9 @@ Status WalManager::init() { RETURN_IF_ERROR(_init_wal_dirs_conf()); RETURN_IF_ERROR(_init_wal_dirs()); RETURN_IF_ERROR(_init_wal_dirs_info()); + for (auto wal_dir : _wal_dirs) { + RETURN_IF_ERROR(scan_wals(wal_dir)); + } return Thread::create( "WalMgr", "replay_wal", [this]() { static_cast(this->replay()); }, &_replay_thread); @@ -112,7 +115,7 @@ Status WalManager::_init_wal_dirs_conf() { Status WalManager::_init_wal_dirs() { bool exists = false; for (auto wal_dir : _wal_dirs) { - std::string tmp_dir = wal_dir + "/tmp"; + std::string tmp_dir = wal_dir + "/" + tmp; LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); if (!exists) { @@ -122,7 +125,6 @@ Status WalManager::_init_wal_dirs() { if (!exists) { RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); } - RETURN_IF_ERROR(scan_wals(wal_dir)); } return Status::OK(); } @@ -164,15 +166,15 @@ Status WalManager::_init_wal_dirs_info() { &_update_wal_dirs_info_thread); } -void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) { +void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status) { std::lock_guard wrlock(_wal_status_lock); LOG(INFO) << "add wal queue " << ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status; auto it = _wal_status_queues.find(table_id); if (it == _wal_status_queues.end()) { - std::unordered_map tmp; - tmp.emplace(wal_id, wal_status); - _wal_status_queues.emplace(table_id, tmp); + std::unordered_map tmp_map; + tmp_map.emplace(wal_id, wal_status); + _wal_status_queues.emplace(table_id, tmp_map); } else { it->second.emplace(wal_id, wal_status); } @@ -305,12 +307,12 @@ Status WalManager::scan_wals(const std::string& wal_path) { LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" << st.to_string(); return st; } - for (const auto& db_id : dbs) { - if (db_id.is_file) { + for (const auto& database_id : dbs) { + if (database_id.is_file || database_id.file_name == tmp) { continue; } std::vector tables; - auto db_path = wal_path + "/" + db_id.file_name; + auto db_path = wal_path + "/" + database_id.file_name; st = io::global_local_filesystem()->list(db_path, false, &tables, &exists); if (!st.ok()) { LOG(WARNING) << "Failed list files for dir=" << db_path << ", st=" << st.to_string(); @@ -342,20 +344,16 @@ Status WalManager::scan_wals(const std::string& wal_path) { int64_t wal_id = std::strtoll(wal.file_name.substr(0, pos).c_str(), NULL, 10); _wal_path_map.emplace(wal_id, wal_file); + int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10); int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10); - add_wal_status_queue(tb_id, wal_id, WalManager::WAL_STATUS::REPLAY); + add_wal_status_queue(tb_id, wal_id, WalManager::WalStatus::REPLAY); + RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id, wal_file)); } catch (const std::invalid_argument& e) { return Status::InvalidArgument("Invalid format, {}", e.what()); } } } - st = add_recover_wal(std::stoll(db_id.file_name), std::stoll(table_id.file_name), res); count += res.size(); - if (!st.ok()) { - LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name - << ", table=" << table_id.file_name << ", st=" << st.to_string(); - return st; - } } } LOG(INFO) << "Finish list all wals, size:" << count; @@ -396,7 +394,8 @@ Status WalManager::replay() { return Status::OK(); } -Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector wals) { +Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, + std::string wal) { std::lock_guard wrlock(_lock); std::shared_ptr table_ptr; auto it = _table_map.find(table_id); @@ -406,12 +405,10 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector< } else { table_ptr = it->second; } - table_ptr->add_wals(wals); + table_ptr->add_wal(wal_id, wal); #ifndef BE_TEST - for (auto wal : wals) { - RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal))); - RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal))); - } + RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal))); + RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal))); #endif return Status::OK(); } diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index 8d330a64bb9fc8..4b42beaf45551e 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -47,7 +47,7 @@ class WalManager { ENABLE_FACTORY_CREATOR(WalManager); public: - enum WAL_STATUS { + enum WalStatus { PREPARE = 0, REPLAY, CREATE, @@ -56,6 +56,7 @@ class WalManager { public: WalManager(ExecEnv* exec_env, const std::string& wal_dir); ~WalManager(); + // used for wal Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0); Status init(); Status scan_wals(const std::string& wal_path); @@ -64,13 +65,13 @@ class WalManager { Status create_wal_writer(int64_t wal_id, std::shared_ptr& wal_writer); Status scan(); size_t get_wal_table_size(int64_t table_id); - Status add_recover_wal(int64_t db_id, int64_t table_id, std::vector wals); + Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal); Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label, std::string& base_path); Status get_wal_path(int64_t wal_id, std::string& wal_path); Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request, PGetWalQueueSizeResponse* response); - void add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status); + void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status); Status erase_wal_status_queue(int64_t table_id, int64_t wal_id); void print_wal_status_queue(); void stop(); @@ -80,6 +81,7 @@ class WalManager { void erase_wal_column_index(int64_t wal_id); Status get_wal_column_index(int64_t wal_id, std::vector& column_index); + // used for limit Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1); Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1); Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated, @@ -88,6 +90,7 @@ class WalManager { size_t get_max_available_size(); private: + // used for limit Status _init_wal_dirs_conf(); Status _init_wal_dirs(); Status _init_wal_dirs_info(); @@ -99,11 +102,13 @@ class WalManager { // used for be ut size_t wal_limit_test_bytes; + const std::string tmp = "tmp"; + private: + //used for wal ExecEnv* _exec_env = nullptr; std::shared_mutex _lock; scoped_refptr _replay_thread; - scoped_refptr _update_wal_dirs_info_thread; CountDownLatch _stop_background_threads_latch; std::map> _table_map; std::vector _wal_dirs; @@ -111,11 +116,13 @@ class WalManager { std::shared_mutex _wal_status_lock; std::unordered_map _wal_path_map; std::unordered_map> _wal_id_to_writer_map; - std::unordered_map> _wal_status_queues; + std::unordered_map> _wal_status_queues; std::atomic _stop; std::shared_mutex _wal_column_id_map_lock; std::unordered_map&> _wal_column_id_map; std::unique_ptr _thread_pool; + // used for limit + scoped_refptr _update_wal_dirs_info_thread; std::unique_ptr _wal_dirs_info; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp index 7f98c410b0779a..a1c9a12f3e1750 100644 --- a/be/src/olap/wal_table.cpp +++ b/be/src/olap/wal_table.cpp @@ -17,19 +17,16 @@ #include "olap/wal_table.h" -#include -#include -#include -#include #include -#include "evhttp.h" +#include "http/action/http_stream.h" #include "http/action/stream_load.h" #include "http/ev_http_server.h" #include "http/http_common.h" #include "http/http_headers.h" #include "http/utils.h" #include "io/fs/local_file_system.h" +#include "io/fs/stream_load_pipe.h" #include "olap/wal_manager.h" #include "runtime/client_cache.h" #include "runtime/fragment_mgr.h" @@ -41,125 +38,149 @@ namespace doris { WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) - : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {} + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) { + _http_stream_action = std::make_shared(exec_env); +} WalTable::~WalTable() {} #ifdef BE_TEST -std::string k_request_line; +Status k_stream_load_exec_status; #endif -bool retry = false; - -void WalTable::add_wals(std::vector wals) { +void WalTable::add_wal(int64_t wal_id, std::string wal) { std::lock_guard lock(_replay_wal_lock); - for (const auto& wal : wals) { - LOG(INFO) << "add replay wal " << wal; - _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false}); - } + LOG(INFO) << "add replay wal " << wal; + auto wal_info = std::make_shared(wal_id, wal, 0, UnixMillis()); + _replay_wal_map.emplace(wal, wal_info); } -Status WalTable::replay_wals() { +void WalTable::pick_relay_wals() { + std::lock_guard lock(_replay_wal_lock); std::vector need_replay_wals; std::vector need_erase_wals; - { - std::lock_guard lock(_replay_wal_lock); - if (_replay_wal_map.empty()) { - return Status::OK(); - } - VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id - << ", wal size=" << _replay_wal_map.size(); - for (auto& [wal, info] : _replay_wal_map) { - auto& [retry_num, start_ts, replaying] = info; - if (replaying) { - LOG(INFO) << wal << " is replaying, skip this round"; - return Status::OK(); - } - if (retry_num >= config::group_commit_replay_wal_retry_num) { - LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id - << ", wal=" << wal - << ", retry_num=" << config::group_commit_replay_wal_retry_num; - std::string rename_path = _get_tmp_path(wal); - LOG(INFO) << "rename wal from " << wal << " to " << rename_path; - std::rename(wal.c_str(), rename_path.c_str()); - need_erase_wals.push_back(wal); - continue; - } - if (_need_replay(info)) { - need_replay_wals.push_back(wal); + for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) { + auto wal_info = it->second; + if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { + LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id + << ", wal=" << it->first << ", retry_num=" << wal_info->get_retry_num(); + auto st = _rename_to_tmp_path(it->first); + if (!st.ok()) { + LOG(WARNING) << "rename " << it->first << " fail" + << ",st:" << st.to_string(); } + need_erase_wals.push_back(it->first); + continue; } - std::sort(need_replay_wals.begin(), need_replay_wals.end()); - for (const auto& wal : need_erase_wals) { - if (_replay_wal_map.erase(wal)) { - LOG(INFO) << "erase wal " << wal << " from _replay_wal_map"; - } else { - LOG(WARNING) << "fail to erase wal " << wal << " from _replay_wal_map"; - } + if (_need_replay(wal_info)) { + need_replay_wals.push_back(it->first); } } + for (const auto& wal : need_erase_wals) { + _replay_wal_map.erase(wal); + } + std::sort(need_replay_wals.begin(), need_replay_wals.end()); for (const auto& wal : need_replay_wals) { + _replaying_queue.emplace_back(_replay_wal_map[wal]); + _replay_wal_map.erase(wal); + } +} + +Status WalTable::relay_wal_one_by_one() { + std::vector> need_retry_wals; + std::vector> need_delete_wals; + while (!_replaying_queue.empty()) { + std::shared_ptr wal_info = nullptr; { std::lock_guard lock(_replay_wal_lock); - if (_stop.load()) { - break; + wal_info = _replaying_queue.front(); + _replaying_queue.pop_front(); + } + wal_info->add_retry_num(); + auto st = _replay_wal_internal(wal_info->get_wal_path()); + if (!st.ok()) { + LOG(WARNING) << "failed replay wal, db=" << _db_id << ", table=" << _table_id + << ", wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); + if (!st.is()) { + need_retry_wals.push_back(wal_info); } else { - auto it = _replay_wal_map.find(wal); - if (it != _replay_wal_map.end()) { - auto& [retry_num, start_time, replaying] = it->second; - replaying = true; - } + need_delete_wals.push_back(wal_info); } + } else { + need_delete_wals.push_back(wal_info); + } + VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id + << ", wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); + } + { + std::lock_guard lock(_replay_wal_lock); + for (auto retry_wal_info : need_retry_wals) { + _replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info); } - auto st = _replay_wal_internal(wal); + } + for (auto delete_wal_info : need_delete_wals) { + auto st = _delete_wal(delete_wal_info->get_wal_id()); if (!st.ok()) { - std::lock_guard lock(_replay_wal_lock); - auto it = _replay_wal_map.find(wal); - if (it != _replay_wal_map.end()) { - auto& [retry_num, start_time, replaying] = it->second; - replaying = false; - } - LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id - << ", table=" << _table_id << ", wal=" << wal << ", st=" << st.to_string(); - break; + LOG(WARNING) << "fail to delete wal " << delete_wal_info->get_wal_path(); } - VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id << ", label=" << wal - << ", st=" << st.to_string(); } return Status::OK(); } - -std::string WalTable::_get_tmp_path(const std::string wal) { - std::vector path_element; - doris::vectorized::WalReader::string_split(wal, "/", path_element); - std::stringstream ss; - int index = 0; - while (index < path_element.size() - 3) { - ss << path_element[index] << "/"; - index++; +Status WalTable::replay_wals() { + { + std::lock_guard lock(_replay_wal_lock); + if (_replay_wal_map.empty()) { + LOG(INFO) << "_replay_wal_map is empty,skip relaying"; + return Status::OK(); + } + if (!_replaying_queue.empty()) { + LOG(INFO) << "_replaying_queue is not empty,skip relaying"; + return Status::OK(); + } } - ss << "tmp/"; - while (index < path_element.size()) { - if (index != path_element.size() - 1) { - ss << path_element[index] << "_"; - } else { - ss << path_element[index]; + VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id + << ", wal size=" << _replay_wal_map.size(); + pick_relay_wals(); + RETURN_IF_ERROR(relay_wal_one_by_one()); + return Status::OK(); +} + +Status WalTable::_rename_to_tmp_path(const std::string wal) { + io::Path wal_path = wal; + std::list path_element; + for (int i = 0; i < 3; ++i) { + if (!wal_path.has_parent_path()) { + return Status::InternalError("parent path is not enough when rename " + wal); } - index++; + path_element.push_front(wal_path.filename().string()); + wal_path = wal_path.parent_path(); } - return ss.str(); + wal_path.append(_exec_env->wal_mgr()->tmp); + for (auto path : path_element) { + wal_path.append(path); + } + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path())); + } + auto res = std::rename(wal.c_str(), wal_path.string().c_str()); + if (res < 0) { + return Status::InternalError("rename fail on path " + wal); + } + LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string(); + return Status::OK(); } -bool WalTable::_need_replay(const doris::WalTable::replay_wal_info& info) { +bool WalTable::_need_replay(std::shared_ptr wal_info) { #ifndef BE_TEST - auto& [retry_num, start_ts, replaying] = info; - auto replay_interval = - pow(2, retry_num) * config::group_commit_replay_wal_retry_interval_seconds * 1000; - return UnixMillis() - start_ts >= replay_interval; + auto replay_interval = pow(2, wal_info->get_retry_num()) * + config::group_commit_replay_wal_retry_interval_seconds * 1000; + return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval; #else return true; #endif } -Status WalTable::_abort_txn(int64_t db_id, int64_t wal_id) { +Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) { TLoadTxnRollbackRequest request; request.__set_auth_code(0); // this is a fake, fe not check it now request.__set_db_id(db_id); @@ -181,47 +202,23 @@ Status WalTable::_abort_txn(int64_t db_id, int64_t wal_id) { Status WalTable::_replay_wal_internal(const std::string& wal) { LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << _table_id << ", wal=" << wal; - // start a new stream load - { - std::lock_guard lock(_replay_wal_lock); - auto it = _replay_wal_map.find(wal); - if (it != _replay_wal_map.end()) { - auto& [retry_num, start_time, replaying] = it->second; - ++retry_num; - replaying = true; - } else { - LOG(WARNING) << "can not find wal in stream load replay map. db=" << _db_id - << ", table=" << _table_id << ", wal=" << wal; - return Status::OK(); - } - } std::shared_ptr> pair = nullptr; - RETURN_IF_ERROR(_get_wal_info(wal, pair)); + RETURN_IF_ERROR(_parse_wal_path(wal, pair)); auto wal_id = pair->first; auto label = pair->second; #ifndef BE_TEST - auto st = _abort_txn(_db_id, wal_id); + auto st = _try_abort_txn(_db_id, wal_id); if (!st.ok()) { LOG(WARNING) << "abort txn " << wal_id << " fail"; } - auto get_st = _get_column_info(_db_id, _table_id); - if (!get_st.ok()) { - if (get_st.is()) { - { - std::lock_guard lock(_replay_wal_lock); - _replay_wal_map.erase(wal); - } - RETURN_IF_ERROR(_delete_wal(wal_id)); - } - return get_st; - } + RETURN_IF_ERROR(_get_column_info(_db_id, _table_id)); #endif - RETURN_IF_ERROR(_send_request(wal_id, wal, label)); + RETURN_IF_ERROR(_replay_one_txn_with_stremaload(wal_id, wal, label)); return Status::OK(); } -Status WalTable::_get_wal_info(const std::string& wal, - std::shared_ptr>& pair) { +Status WalTable::_parse_wal_path(const std::string& wal, + std::shared_ptr>& pair) { std::vector path_element; doris::vectorized::WalReader::string_split(wal, "/", path_element); auto pos = path_element[path_element.size() - 1].find("_"); @@ -236,171 +233,104 @@ Status WalTable::_get_wal_info(const std::string& wal, return Status::OK(); } -void http_request_done(struct evhttp_request* req, void* arg) { - std::stringstream out; - std::string status; - std::string msg; - std::string wal_id; - size_t len = 0; - if (req != nullptr) { - auto input = evhttp_request_get_input_buffer(req); - char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); - while (request_line != nullptr) { - std::string s(request_line); - out << request_line; - request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); - } - auto out_str = out.str(); - LOG(INFO) << "replay wal out_str:" << out_str; - rapidjson::Document doc; - if (!out_str.empty()) { - doc.Parse(out.str().c_str()); - status = std::string(doc["Status"].GetString()); - msg = std::string(doc["Message"].GetString()); - LOG(INFO) << "replay wal status:" << status << ",msg:" << msg; - if (status.find("Fail") != status.npos) { - if (msg.find("Label") != msg.npos && - msg.find("has already been used") != msg.npos) { - retry = false; - } else { - retry = true; - } - } else { - retry = false; - } - } else { - retry = true; - } - } else { - LOG(WARNING) << "req is null"; - } - - if (arg != nullptr) { - event_base_loopbreak((struct event_base*)arg); - } else { - LOG(WARNING) << "arg is null"; - } -} - -Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) { -#ifndef BE_TEST - struct event_base* base = nullptr; - struct evhttp_connection* conn = nullptr; - struct evhttp_request* req = nullptr; - retry = false; - event_init(); - base = event_base_new(); - conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port); - evhttp_connection_set_base(conn, base); - req = evhttp_request_new(http_request_done, base); - evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(), label.c_str()); - evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(), std::to_string(wal_id).c_str()); - evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(), std::to_string(wal_id).c_str()); +Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label, + std::string& sql_str, std::vector& index_vector) { std::string columns; RETURN_IF_ERROR(_read_wal_header(wal, columns)); std::vector column_id_element; doris::vectorized::WalReader::string_split(columns, ",", column_id_element); - std::vector index_vector; std::stringstream ss_name; std::stringstream ss_id; int index_raw = 0; for (auto column_id_str : column_id_element) { try { int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10); - auto it = _column_id_name_map.find(column_id); - auto it2 = _column_id_index_map.find(column_id); - if (it != _column_id_name_map.end() && it2 != _column_id_index_map.end()) { - ss_name << "`" << it->second << "`,"; - ss_id << "c" << std::to_string(_column_id_index_map[column_id]) << ","; + auto it = _column_id_info_map.find(column_id); + if (it != _column_id_info_map.end()) { + ss_name << "`" << it->second->first << "`,"; + ss_id << "c" << std::to_string(it->second->second) << ","; index_vector.emplace_back(index_raw); - _column_id_name_map.erase(column_id); - _column_id_index_map.erase(column_id); } index_raw++; } catch (const std::invalid_argument& e) { return Status::InvalidArgument("Invalid format, {}", e.what()); } } - _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector); auto name = ss_name.str().substr(0, ss_name.str().size() - 1); auto id = ss_id.str().substr(0, ss_id.str().size() - 1); std::stringstream ss; ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " (" << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \"" << std::to_string(_table_id) << "\")"; - evhttp_add_header(req->output_headers, HTTP_SQL.c_str(), ss.str().c_str()); - evbuffer* output = evhttp_request_get_output_buffer(req); - evbuffer_add_printf(output, "replay wal %s", std::to_string(wal_id).c_str()); - - evhttp_make_request(conn, req, EVHTTP_REQ_PUT, "/api/_http_stream"); - evhttp_connection_set_timeout(req->evcon, 300); - - event_base_dispatch(base); - evhttp_connection_free(conn); - event_base_free(base); + sql_str = ss.str().data(); + return Status::OK(); +} +Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, + const std::string& label) { + std::string sql_str; + std::vector index_vector; + RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str, index_vector)); + _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector); + std::shared_ptr ctx = std::make_shared(_exec_env); + ctx->sql_str = sql_str; + ctx->wal_id = wal_id; + ctx->auth.auth_code = wal_id; + ctx->label = label; + auto st = _http_stream_action->process_put(nullptr, ctx); + if (st.ok()) { + // wait stream load finish + RETURN_IF_ERROR(ctx->future.get()); + if (ctx->status.ok()) { + auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get()); + st = commit_st; + } else if (!ctx->status.ok()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + _exec_env->stream_load_executor()->rollback_txn(ctx.get()); + st = ctx->status; + } + } + _exec_env->wal_mgr()->erase_wal_column_index(wal_id); + LOG(INFO) << "relay wal id=" << wal_id << ",st=" << st.to_string(); + return st; +} +Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, + const std::string& label) { + bool success = false; +#ifndef BE_TEST + auto st = _handle_stream_load(wal_id, wal, label); + auto msg = st.msg(); + success = st.ok() || st.is() || + msg.find("LabelAlreadyUsedException") != msg.npos; #else - std::stringstream out; - out << k_request_line; - auto out_str = out.str(); - rapidjson::Document doc; - doc.Parse(out_str.c_str()); - auto status = std::string(doc["Status"].GetString()); - if (status.find("Fail") != status.npos) { - retry = true; - } else { - retry = false; - } + success = k_stream_load_exec_status.ok(); #endif - if (retry) { - LOG(INFO) << "fail to replay wal =" << wal; - std::lock_guard lock(_replay_wal_lock); - auto it = _replay_wal_map.find(wal); - if (it != _replay_wal_map.end()) { - auto& [retry_num, start_time, replaying] = it->second; - replaying = false; - } else { - _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false}); - } - } else { + if (success) { LOG(INFO) << "success to replay wal =" << wal; - RETURN_IF_ERROR(_delete_wal(wal_id)); - std::lock_guard lock(_replay_wal_lock); - if (_replay_wal_map.erase(wal)) { - LOG(INFO) << "erase " << wal << " from _replay_wal_map"; - } else { - LOG(WARNING) << "fail to erase " << wal << " from _replay_wal_map"; - } + } else { + LOG(INFO) << "fail to replay wal =" << wal; + return Status::InternalError("fail to replay wal =" + wal); } - _exec_env->wal_mgr()->erase_wal_column_index(wal_id); return Status::OK(); } void WalTable::stop() { - bool done = true; do { { std::lock_guard lock(_replay_wal_lock); if (!this->_stop.load()) { this->_stop.store(true); } - auto it = _replay_wal_map.begin(); - for (; it != _replay_wal_map.end(); it++) { - auto& [retry_num, start_time, replaying] = it->second; - if (replaying) { - break; - } - } - if (it != _replay_wal_map.end()) { - done = false; - } else { - done = true; + if (_replay_wal_map.empty() && _replaying_queue.empty()) { + break; } - } - if (!done) { + LOG(INFO) << "stopping wal_table,wait for relay wal task done, now " + << _replay_wal_map.size() << " wals wait to replay, " + << _replaying_queue.size() << " wals are replaying"; std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } - } while (!done); + } while (true); } size_t WalTable::size() { @@ -429,13 +359,13 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) { } std::vector column_element = result.columns; int64_t column_index = 1; - _column_id_name_map.clear(); - _column_id_index_map.clear(); + _column_id_info_map.clear(); for (auto column : column_element) { - auto column_name = column.columnName; - auto column_id = column.columnId; - _column_id_name_map.emplace(column_id, column_name); - _column_id_index_map.emplace(column_id, column_index); + auto column_name = column.column_name; + auto column_id = column.column_id; + std::shared_ptr column_pair = + std::make_shared(std::make_pair(column_name, column_index)); + _column_id_info_map.emplace(column_id, column_pair); column_index++; } } diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h index e3d66d577a2f93..251e8d51a61244 100644 --- a/be/src/olap/wal_table.h +++ b/be/src/olap/wal_table.h @@ -24,6 +24,8 @@ #include "gen_cpp/FrontendService.h" #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/HeartbeatService_types.h" +#include "http/action/http_stream.h" +#include "olap/wal_info.h" #include "runtime/exec_env.h" #include "runtime/stream_load/stream_load_context.h" namespace doris { @@ -32,25 +34,32 @@ class WalTable { WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id); ~WalTable(); // used when be start and there are wals need to do recovery - void add_wals(std::vector wals); + void add_wal(int64_t wal_id, std::string wal); + void pick_relay_wals(); + Status relay_wal_one_by_one(); Status replay_wals(); size_t size(); void stop(); public: - // - using replay_wal_info = std::tuple; + // + using ColumnInfo = std::pair; private: - Status _get_wal_info(const std::string& wal, std::shared_ptr>&); - std::string _get_tmp_path(const std::string wal); - Status _send_request(int64_t wal_id, const std::string& wal, const std::string& label); - Status _abort_txn(int64_t db_id, int64_t wal_id); + Status _parse_wal_path(const std::string& wal, + std::shared_ptr>&); + Status _rename_to_tmp_path(const std::string wal); + Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, + const std::string& label); + Status _try_abort_txn(int64_t db_id, int64_t wal_id); Status _get_column_info(int64_t db_id, int64_t tb_id); Status _read_wal_header(const std::string& wal, std::string& columns); - bool _need_replay(const replay_wal_info& info); + bool _need_replay(std::shared_ptr); Status _replay_wal_internal(const std::string& wal); Status _delete_wal(int64_t wal_id); + Status _construct_sql_str(const std::string& wal, const std::string& label, + std::string& sql_str, std::vector& index_vector); + Status _handle_stream_load(int64_t wal_id, const std::string& wal, const std::string& label); private: ExecEnv* _exec_env; @@ -60,9 +69,10 @@ class WalTable { std::string _split = "_"; mutable std::mutex _replay_wal_lock; // key is wal_id - std::map _replay_wal_map; + std::map> _replay_wal_map; + std::list> _replaying_queue; std::atomic _stop; - std::map _column_id_name_map; - std::map _column_id_index_map; + std::map> _column_id_info_map; + std::shared_ptr _http_stream_action; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 28df650f13c7b8..acb40ae6c78661 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -51,7 +51,11 @@ Status LoadBlockQueue::add_block(std::shared_ptr block, bool if (block->rows() > 0) { _block_queue.push_back(block); if (write_wal) { - RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get())); + auto st = _v_wal_writer->write_wal(block.get()); + if (!st.ok()) { + _cancel_without_lock(st); + return st; + } } _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } @@ -278,11 +282,17 @@ Status GroupCommitTable::_create_group_commit_load( _load_block_queues.emplace(instance_id, load_block_queue); _need_plan_fragment = false; _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id, - WalManager::WAL_STATUS::PREPARE); + WalManager::WalStatus::PREPARE); //create wal - RETURN_IF_ERROR( - load_block_queue->create_wal(_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), - params.desc_tbl.slotDescriptors, be_exe_version)); + if (!is_pipeline) { + RETURN_IF_ERROR(load_block_queue->create_wal( + _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), + params.desc_tbl.slotDescriptors, be_exe_version)); + } else { + RETURN_IF_ERROR(load_block_queue->create_wal( + _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), + pipeline_params.desc_tbl.slotDescriptors, be_exe_version)); + } _cv.notify_all(); } st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, @@ -367,10 +377,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ } else { std::string wal_path; RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); - RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, - std::vector {wal_path})); - _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id, - WalManager::WAL_STATUS::REPLAY); + RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path)); + _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id, WalManager::WalStatus::REPLAY); } std::stringstream ss; ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index b530242743e086..e57996af9e12fb 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -139,6 +139,7 @@ class StreamLoadContext { int64_t table_id = -1; int64_t schema_version = -1; std::string label; + std::string sql_str; // optional std::string sub_label; double max_filter_ratio = 0.0; diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index 01846acc04aa18..32fd66cc309629 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -88,8 +88,6 @@ void WalReader::string_split(const std::string& str, const std::string& splits, Status WalReader::get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) { RETURN_IF_ERROR(_wal_reader->read_header(_version, _col_ids)); - std::vector col_element; - string_split(_col_ids, ",", col_element); RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_column_index(_wal_id, _column_index)); return Status::OK(); } diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp index 2dc945a2a2f5b9..2ab37c340a0512 100644 --- a/be/src/vec/sink/writer/vwal_writer.cpp +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -56,7 +56,7 @@ VWalWriter::~VWalWriter() {} Status VWalWriter::init() { RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer)); - _wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WAL_STATUS::CREATE); + _wal_manager->add_wal_status_queue(_tb_id, _wal_id, WalManager::WalStatus::CREATE); std::stringstream ss; for (auto slot_desc : _slot_descs) { if (slot_desc.col_unique_id < 0) { diff --git a/be/test/olap/wal_manager_test.cpp b/be/test/olap/wal_manager_test.cpp index 93d8636eb225ec..c5c216f12a795a 100644 --- a/be/test/olap/wal_manager_test.cpp +++ b/be/test/olap/wal_manager_test.cpp @@ -44,8 +44,7 @@ namespace doris { extern TLoadTxnBeginResult k_stream_load_begin_result; -extern Status k_stream_load_plan_status; -extern std::string k_request_line; +extern Status k_stream_load_exec_status; ExecEnv* _env = nullptr; std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test"; @@ -67,7 +66,6 @@ class WalManagerTest : public testing::Test { _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)}; _env->_wal_manager = WalManager::create_shared(_env, wal_dir); k_stream_load_begin_result = TLoadTxnBeginResult(); - k_stream_load_plan_status = Status::OK(); } void TearDown() override { static_cast(io::global_local_filesystem()->delete_directory(wal_dir)); @@ -88,7 +86,7 @@ class WalManagerTest : public testing::Test { TEST_F(WalManagerTest, recovery_normal) { _env->wal_mgr()->wal_limit_test_bytes = 1099511627776; - k_request_line = "{\"Status\": \"Success\", \"Message\": \"Test\"}"; + k_stream_load_exec_status = Status::OK(); std::string db_id = "1"; int64_t tb_1_id = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 0fcd5050fc83cb..da82c4406c323d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -1110,7 +1110,7 @@ public RedirectStatus getRedirectStatus() { } public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException { - if (isGroupCommitStreamLoadSql && (targetTable instanceof OlapTable) + if (isGroupCommitStreamLoadSql && targetTable != null && (targetTable instanceof OlapTable) && !((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()) { throw new AnalysisException( "table light_schema_change is false, can't do http_stream with group commit mode"); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d672597a0db40f..209947c8b78778 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -688,16 +688,6 @@ struct TStreamLoadWithLoadStatusResult { 6: optional i64 unselected_rows } -struct TCheckWalRequest { - 1: optional i64 wal_id - 2: optional i64 db_id -} - -struct TCheckWalResult { - 1: optional Status.TStatus status - 2: optional bool need_recovery -} - struct TKafkaRLTaskProgress { 1: required map partitionCmtOffset } @@ -1303,8 +1293,8 @@ struct TGetBackendMetaResult { } struct TColumnInfo { - 1: optional string columnName - 2: optional i64 columnId + 1: optional string column_name + 2: optional i64 column_id } struct TGetColumnInfoRequest {