-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[enhancement](group_commit): refector relay wal code #29183
Changes from all commits
c56d03d
4955636
f379361
b908b34
4be61af
461a256
10e8240
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,41 @@ | ||||||||||
// Licensed to the Apache Software Foundation (ASF) under one | ||||||||||
// or more contributor license agreements. See the NOTICE file | ||||||||||
// distributed with this work for additional information | ||||||||||
// regarding copyright ownership. The ASF licenses this file | ||||||||||
// to you under the Apache License, Version 2.0 (the | ||||||||||
// "License"); you may not use this file except in compliance | ||||||||||
// with the License. You may obtain a copy of the License at | ||||||||||
// | ||||||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||
// | ||||||||||
// Unless required by applicable law or agreed to in writing, | ||||||||||
// software distributed under the License is distributed on an | ||||||||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||||
// KIND, either express or implied. See the License for the | ||||||||||
// specific language governing permissions and limitations | ||||||||||
// under the License. | ||||||||||
|
||||||||||
#include "olap/wal_info.h" | ||||||||||
namespace doris { | ||||||||||
WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms) | ||||||||||
: _wal_id(wal_id), | ||||||||||
_wal_path(wal_path), | ||||||||||
_retry_num(retry_num), | ||||||||||
_start_time_ms(start_time_ms) {} | ||||||||||
WalInfo::~WalInfo() {} | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: use '= default' to define a trivial destructor [modernize-use-equals-default]
Suggested change
|
||||||||||
int64_t WalInfo::get_wal_id() { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_wal_id' can be made const [readability-make-member-function-const]
Suggested change
be/src/olap/wal_info.h:24: - int64_t get_wal_id();
+ int64_t get_wal_id() const; |
||||||||||
return _wal_id; | ||||||||||
} | ||||||||||
std::string WalInfo::get_wal_path() { | ||||||||||
return _wal_path; | ||||||||||
} | ||||||||||
int64_t WalInfo::get_retry_num() { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_retry_num' can be made const [readability-make-member-function-const]
Suggested change
be/src/olap/wal_info.h:24: - int64_t get_retry_num();
+ int64_t get_retry_num() const; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_retry_num' can be made const [readability-make-member-function-const]
Suggested change
be/src/olap/wal_info.h:25: - int64_t get_retry_num();
+ int64_t get_retry_num() const; |
||||||||||
return _retry_num; | ||||||||||
} | ||||||||||
int64_t WalInfo::get_start_time_ms() { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_start_time_ms' can be made const [readability-make-member-function-const]
Suggested change
be/src/olap/wal_info.h:25: - int64_t get_start_time_ms();
+ int64_t get_start_time_ms() const; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'get_start_time_ms' can be made const [readability-make-member-function-const]
Suggested change
be/src/olap/wal_info.h:26: - int64_t get_start_time_ms();
+ int64_t get_start_time_ms() const; |
||||||||||
return _start_time_ms; | ||||||||||
} | ||||||||||
void WalInfo::add_retry_num() { | ||||||||||
_retry_num++; | ||||||||||
} | ||||||||||
} // namespace doris |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
#pragma once | ||
#include "runtime/exec_env.h" | ||
|
||
namespace doris { | ||
class WalInfo { | ||
public: | ||
WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms); | ||
~WalInfo(); | ||
int64_t get_wal_id(); | ||
int64_t get_retry_num(); | ||
int64_t get_start_time_ms(); | ||
std::string get_wal_path(); | ||
void add_retry_num(); | ||
|
||
private: | ||
int64_t _wal_id; | ||
std::string _wal_path; | ||
int64_t _retry_num; | ||
int64_t _start_time_ms; | ||
}; | ||
|
||
} // namespace doris |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,6 +82,9 @@ Status WalManager::init() { | |
RETURN_IF_ERROR(_init_wal_dirs_conf()); | ||
RETURN_IF_ERROR(_init_wal_dirs()); | ||
RETURN_IF_ERROR(_init_wal_dirs_info()); | ||
for (auto wal_dir : _wal_dirs) { | ||
RETURN_IF_ERROR(scan_wals(wal_dir)); | ||
} | ||
return Thread::create( | ||
"WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); }, | ||
&_replay_thread); | ||
|
@@ -112,7 +115,7 @@ Status WalManager::_init_wal_dirs_conf() { | |
Status WalManager::_init_wal_dirs() { | ||
bool exists = false; | ||
for (auto wal_dir : _wal_dirs) { | ||
std::string tmp_dir = wal_dir + "/tmp"; | ||
std::string tmp_dir = wal_dir + "/" + tmp; | ||
LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir; | ||
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists)); | ||
if (!exists) { | ||
|
@@ -122,7 +125,6 @@ Status WalManager::_init_wal_dirs() { | |
if (!exists) { | ||
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir)); | ||
} | ||
RETURN_IF_ERROR(scan_wals(wal_dir)); | ||
} | ||
return Status::OK(); | ||
} | ||
|
@@ -164,15 +166,15 @@ Status WalManager::_init_wal_dirs_info() { | |
&_update_wal_dirs_info_thread); | ||
} | ||
|
||
void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) { | ||
void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: method 'add_wal_status_queue' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:73: - void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status);
+ static void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status); |
||
std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock); | ||
LOG(INFO) << "add wal queue " | ||
<< ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status; | ||
auto it = _wal_status_queues.find(table_id); | ||
if (it == _wal_status_queues.end()) { | ||
std::unordered_map<int64_t, WAL_STATUS> tmp; | ||
tmp.emplace(wal_id, wal_status); | ||
_wal_status_queues.emplace(table_id, tmp); | ||
std::unordered_map<int64_t, WalStatus> tmp_map; | ||
tmp_map.emplace(wal_id, wal_status); | ||
_wal_status_queues.emplace(table_id, tmp_map); | ||
} else { | ||
it->second.emplace(wal_id, wal_status); | ||
} | ||
|
@@ -305,12 +307,12 @@ Status WalManager::scan_wals(const std::string& wal_path) { | |
LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" << st.to_string(); | ||
return st; | ||
} | ||
for (const auto& db_id : dbs) { | ||
if (db_id.is_file) { | ||
for (const auto& database_id : dbs) { | ||
if (database_id.is_file || database_id.file_name == tmp) { | ||
continue; | ||
} | ||
std::vector<io::FileInfo> tables; | ||
auto db_path = wal_path + "/" + db_id.file_name; | ||
auto db_path = wal_path + "/" + database_id.file_name; | ||
st = io::global_local_filesystem()->list(db_path, false, &tables, &exists); | ||
if (!st.ok()) { | ||
LOG(WARNING) << "Failed list files for dir=" << db_path << ", st=" << st.to_string(); | ||
|
@@ -342,20 +344,16 @@ Status WalManager::scan_wals(const std::string& wal_path) { | |
int64_t wal_id = | ||
std::strtoll(wal.file_name.substr(0, pos).c_str(), NULL, 10); | ||
_wal_path_map.emplace(wal_id, wal_file); | ||
int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10); | ||
int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10); | ||
add_wal_status_queue(tb_id, wal_id, WalManager::WAL_STATUS::REPLAY); | ||
add_wal_status_queue(tb_id, wal_id, WalManager::WalStatus::REPLAY); | ||
RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id, wal_file)); | ||
} catch (const std::invalid_argument& e) { | ||
return Status::InvalidArgument("Invalid format, {}", e.what()); | ||
} | ||
} | ||
} | ||
st = add_recover_wal(std::stoll(db_id.file_name), std::stoll(table_id.file_name), res); | ||
count += res.size(); | ||
if (!st.ok()) { | ||
LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name | ||
<< ", table=" << table_id.file_name << ", st=" << st.to_string(); | ||
return st; | ||
} | ||
} | ||
} | ||
LOG(INFO) << "Finish list all wals, size:" << count; | ||
|
@@ -396,7 +394,8 @@ Status WalManager::replay() { | |
return Status::OK(); | ||
} | ||
|
||
Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector<std::string> wals) { | ||
Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, | ||
std::string wal) { | ||
std::lock_guard<std::shared_mutex> wrlock(_lock); | ||
std::shared_ptr<WalTable> table_ptr; | ||
auto it = _table_map.find(table_id); | ||
|
@@ -406,12 +405,10 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector< | |
} else { | ||
table_ptr = it->second; | ||
} | ||
table_ptr->add_wals(wals); | ||
table_ptr->add_wal(wal_id, wal); | ||
#ifndef BE_TEST | ||
for (auto wal : wals) { | ||
RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal))); | ||
RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal))); | ||
} | ||
RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal))); | ||
RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal))); | ||
#endif | ||
return Status::OK(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: pass by value and use std::move [modernize-pass-by-value]
be/src/olap/wal_info.cpp:18:
be/src/olap/wal_info.cpp:21: