Skip to content

Commit

Permalink
refector2
Browse files Browse the repository at this point in the history
  • Loading branch information
hust-hhb committed Dec 28, 2023
1 parent 9d50650 commit d3ea597
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 182 deletions.
74 changes: 9 additions & 65 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
ctx->status = process_put(req, ctx);
}
}

Expand All @@ -309,7 +309,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute "
<< "here";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
ctx->status = process_put(req, ctx);
}
ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
}
Expand All @@ -327,11 +327,15 @@ void HttpStreamAction::free_handler_ctx(std::shared_ptr<void> param) {
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
}

Status HttpStreamAction::_process_put(HttpRequest* http_req,
Status HttpStreamAction::process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx) {
TStreamLoadPutRequest request;
set_request_auth(&request, ctx->auth);
request.__set_load_sql(http_req->header(HTTP_SQL));
if (http_req != nullptr) {
request.__set_load_sql(http_req->header(HTTP_SQL));
} else {
request.__set_load_sql(ctx->sql_str);
}
request.__set_loadId(ctx->id.to_thrift());
request.__set_label(ctx->label);
if (ctx->group_commit) {
Expand Down Expand Up @@ -366,7 +370,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (http_req->header(HTTP_GROUP_COMMIT) == "async mode") {
if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = 0;
content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
Expand All @@ -387,66 +391,6 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}

Status HttpStreamAction::process_wal_relay(ExecEnv* exec_env, int64_t wal_id, std::string& sql_str,
const std::string& label) {
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(exec_env);
ctx->wal_id = wal_id;
ctx->auth.auth_code = wal_id;
UniqueId load_id = UniqueId::gen_uid();
TUniqueId tload_id;
tload_id.__set_hi(load_id.hi);
tload_id.__set_lo(load_id.lo);
TStreamLoadPutRequest request;
request.__set_auth_code(ctx->auth.auth_code);
request.__set_load_sql(sql_str);
request.__set_loadId(tload_id);
request.__set_label(label);
if (exec_env->master_info()->__isset.backend_id) {
request.__set_backend_id(exec_env->master_info()->backend_id);
} else {
LOG(WARNING) << "_exec_env->master_info not set backend_id";
}
// plan this load
TNetworkAddress master_addr = exec_env->master_info()->network_address;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, ctx](FrontendServiceConnection& client) {
client->streamLoadPut(ctx->put_result, request);
}));
ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
Status plan_status(Status::create(ctx->put_result.status));
if (!plan_status.ok()) {
auto msg = plan_status.msg();
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
return plan_status;
} else {
ctx->db = ctx->put_result.params.db_name;
ctx->table = ctx->put_result.params.table_name;
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
auto st = exec_env->stream_load_executor()->execute_plan_fragment(ctx);
if (st.ok()) {
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
if (ctx->status.ok()) {
auto commit_st = exec_env->stream_load_executor()->commit_txn(ctx.get());
return commit_st;
} else if (!ctx->status.ok()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
exec_env->stream_load_executor()->rollback_txn(ctx.get());
return ctx->status;
}
} else {
LOG(WARNING) << "execute_plan_fragment fail, id=" << ctx->id << ", errmsg=" << st;
return st;
}
}
return Status::OK();
}

void HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str) {
auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder();
Expand Down
4 changes: 1 addition & 3 deletions be/src/http/action/http_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,12 @@ class HttpStreamAction : public HttpHandler {

void on_chunk_data(HttpRequest* req) override;
void free_handler_ctx(std::shared_ptr<void> ctx) override;
static Status process_wal_relay(ExecEnv* exec_env, int64_t wal_id, std::string& sql_str,
const std::string& label);
Status process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);

private:
Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _handle(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx);
Status _data_saved_path(HttpRequest* req, std::string* file_path);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
void _parse_format(const std::string& format_str, const std::string& compress_type_str,
TFileFormatType::type* format_type, TFileCompressType::type* compress_type);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/wal_dirs_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ size_t WalDirsInfo::get_max_available_size() {
}

Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) {
LOG(INFO) << "wal_dir:" << wal_dir;
for (const auto& wal_dir_info : _wal_dirs_info_vec) {
LOG(INFO) << "wal_dir_info:" << wal_dir_info->get_wal_dir();
if (wal_dir_info->get_wal_dir() == wal_dir) {
return wal_dir_info->update_wal_dir_limit(limit);
}
Expand Down
14 changes: 7 additions & 7 deletions be/src/olap/wal_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

#include "olap/wal_info.h"
namespace doris {
WalInfo::WalInfo(std::string wal_path, int64_t retry_num, int64_t start_time_ms, bool relaying)
: _wal_path(wal_path),
WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms)
: _wal_id(wal_id),
_wal_path(wal_path),
_retry_num(retry_num),
_start_time_ms(start_time_ms),
_relaying(relaying) {}
_start_time_ms(start_time_ms) {}
WalInfo::~WalInfo() {}
int64_t WalInfo::get_wal_id() {
return _wal_id;
}
std::string WalInfo::get_wal_path() {
return _wal_path;
}
Expand All @@ -32,9 +35,6 @@ int64_t WalInfo::get_retry_num() {
int64_t WalInfo::get_start_time_ms() {
return _start_time_ms;
}
bool WalInfo::get_relaying() {
return _relaying;
}
void WalInfo::add_retry_num() {
_retry_num++;
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/wal_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@
namespace doris {
class WalInfo {
public:
WalInfo(std::string wal_path, int64_t retry_num, int64_t start_time_ms, bool relaying);
WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms);
~WalInfo();
int64_t get_wal_id();
int64_t get_retry_num();
int64_t get_start_time_ms();
std::string get_wal_path();
bool get_relaying();
void add_retry_num();

private:
int64_t _wal_id;
std::string _wal_path;
int64_t _retry_num;
int64_t _start_time_ms;
bool _relaying;
};

} // namespace doris
48 changes: 26 additions & 22 deletions be/src/olap/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ Status WalManager::init() {
RETURN_IF_ERROR(_init_wal_dirs_conf());
RETURN_IF_ERROR(_init_wal_dirs());
RETURN_IF_ERROR(_init_wal_dirs_info());
for (auto wal_dir : _wal_dirs) {
RETURN_IF_ERROR(scan_wals(wal_dir));
}
return Thread::create(
"WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); },
&_replay_thread);
Expand Down Expand Up @@ -112,7 +115,7 @@ Status WalManager::_init_wal_dirs_conf() {
Status WalManager::_init_wal_dirs() {
bool exists = false;
for (auto wal_dir : _wal_dirs) {
std::string tmp_dir = wal_dir + "/tmp";
std::string tmp_dir = wal_dir + "/" + tmp;
LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists));
if (!exists) {
Expand All @@ -122,7 +125,6 @@ Status WalManager::_init_wal_dirs() {
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir));
}
RETURN_IF_ERROR(scan_wals(wal_dir));
}
return Status::OK();
}
Expand Down Expand Up @@ -164,15 +166,15 @@ Status WalManager::_init_wal_dirs_info() {
&_update_wal_dirs_info_thread);
}

void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) {
void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status) {
std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
LOG(INFO) << "add wal queue "
<< ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status;
auto it = _wal_status_queues.find(table_id);
if (it == _wal_status_queues.end()) {
std::unordered_map<int64_t, WAL_STATUS> tmp;
tmp.emplace(wal_id, wal_status);
_wal_status_queues.emplace(table_id, tmp);
std::unordered_map<int64_t, WalStatus> tmp_map;
tmp_map.emplace(wal_id, wal_status);
_wal_status_queues.emplace(table_id, tmp_map);
} else {
it->second.emplace(wal_id, wal_status);
}
Expand Down Expand Up @@ -305,12 +307,13 @@ Status WalManager::scan_wals(const std::string& wal_path) {
LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" << st.to_string();
return st;
}
for (const auto& db_id : dbs) {
if (db_id.is_file) {
for (const auto& database_id : dbs) {
LOG(INFO) << "file:" << database_id.file_name;
if (database_id.is_file || database_id.file_name == tmp) {
continue;
}
std::vector<io::FileInfo> tables;
auto db_path = wal_path + "/" + db_id.file_name;
auto db_path = wal_path + "/" + database_id.file_name;
st = io::global_local_filesystem()->list(db_path, false, &tables, &exists);
if (!st.ok()) {
LOG(WARNING) << "Failed list files for dir=" << db_path << ", st=" << st.to_string();
Expand Down Expand Up @@ -342,20 +345,22 @@ Status WalManager::scan_wals(const std::string& wal_path) {
int64_t wal_id =
std::strtoll(wal.file_name.substr(0, pos).c_str(), NULL, 10);
_wal_path_map.emplace(wal_id, wal_file);
int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10);
int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10);
add_wal_status_queue(tb_id, wal_id, WalManager::WAL_STATUS::REPLAY);
add_wal_status_queue(tb_id, wal_id, WalManager::WalStatus::REPLAY);
RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id, wal_file));
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
}
}
st = add_recover_wal(std::stoll(db_id.file_name), std::stoll(table_id.file_name), res);
// st = add_recover_wal(std::stoll(db_id.file_name), std::stoll(table_id.file_name), res);
count += res.size();
if (!st.ok()) {
LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name
<< ", table=" << table_id.file_name << ", st=" << st.to_string();
return st;
}
// if (!st.ok()) {
// LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name
// << ", table=" << table_id.file_name << ", st=" << st.to_string();
// return st;
// }
}
}
LOG(INFO) << "Finish list all wals, size:" << count;
Expand Down Expand Up @@ -396,7 +401,8 @@ Status WalManager::replay() {
return Status::OK();
}

Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector<std::string> wals) {
Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id,
std::string wal) {
std::lock_guard<std::shared_mutex> wrlock(_lock);
std::shared_ptr<WalTable> table_ptr;
auto it = _table_map.find(table_id);
Expand All @@ -406,12 +412,10 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector<
} else {
table_ptr = it->second;
}
table_ptr->add_wals(wals);
table_ptr->add_wal(wal_id, wal);
#ifndef BE_TEST
for (auto wal : wals) {
RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
}
RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
#endif
return Status::OK();
}
Expand Down
17 changes: 12 additions & 5 deletions be/src/olap/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class WalManager {
ENABLE_FACTORY_CREATOR(WalManager);

public:
enum WAL_STATUS {
enum WalStatus {
PREPARE = 0,
REPLAY,
CREATE,
Expand All @@ -56,6 +56,7 @@ class WalManager {
public:
WalManager(ExecEnv* exec_env, const std::string& wal_dir);
~WalManager();
// used for wal
Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
Status init();
Status scan_wals(const std::string& wal_path);
Expand All @@ -64,13 +65,13 @@ class WalManager {
Status create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer);
Status scan();
size_t get_wal_table_size(int64_t table_id);
Status add_recover_wal(int64_t db_id, int64_t table_id, std::vector<std::string> wals);
Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal);
Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label,
std::string& base_path);
Status get_wal_path(int64_t wal_id, std::string& wal_path);
Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
PGetWalQueueSizeResponse* response);
void add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status);
void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status);
Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
void print_wal_status_queue();
void stop();
Expand All @@ -80,6 +81,7 @@ class WalManager {
void erase_wal_column_index(int64_t wal_id);
Status get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);

// used for limit
Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1);
Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1);
Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated,
Expand All @@ -88,6 +90,7 @@ class WalManager {
size_t get_max_available_size();

private:
// used for limit
Status _init_wal_dirs_conf();
Status _init_wal_dirs();
Status _init_wal_dirs_info();
Expand All @@ -99,23 +102,27 @@ class WalManager {
// used for be ut
size_t wal_limit_test_bytes;

const std::string tmp = "tmp";

private:
//used for wal
ExecEnv* _exec_env = nullptr;
std::shared_mutex _lock;
scoped_refptr<Thread> _replay_thread;
scoped_refptr<Thread> _update_wal_dirs_info_thread;
CountDownLatch _stop_background_threads_latch;
std::map<int64_t, std::shared_ptr<WalTable>> _table_map;
std::vector<std::string> _wal_dirs;
std::shared_mutex _wal_lock;
std::shared_mutex _wal_status_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
std::unordered_map<int64_t, std::shared_ptr<WalWriter>> _wal_id_to_writer_map;
std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>> _wal_status_queues;
std::unordered_map<int64_t, std::unordered_map<int64_t, WalStatus>> _wal_status_queues;
std::atomic<bool> _stop;
std::shared_mutex _wal_column_id_map_lock;
std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
std::unique_ptr<doris::ThreadPool> _thread_pool;
// used for limit
scoped_refptr<Thread> _update_wal_dirs_info_thread;
std::unique_ptr<WalDirsInfo> _wal_dirs_info;
};
} // namespace doris
Loading

0 comments on commit d3ea597

Please sign in to comment.