Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](group_commit): refector relay wal code #29183

Merged
merged 7 commits into from
Dec 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
ctx->status = process_put(req, ctx);
}
}

Expand All @@ -272,7 +272,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute "
<< "here";
ctx->is_read_schema = false;
ctx->status = _process_put(req, ctx);
ctx->status = process_put(req, ctx);
}
ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
}
Expand All @@ -290,11 +290,15 @@ void HttpStreamAction::free_handler_ctx(std::shared_ptr<void> param) {
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
}

Status HttpStreamAction::_process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx) {
Status HttpStreamAction::process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx) {
TStreamLoadPutRequest request;
set_request_auth(&request, ctx->auth);
request.__set_load_sql(http_req->header(HTTP_SQL));
if (http_req != nullptr) {
request.__set_load_sql(http_req->header(HTTP_SQL));
} else {
request.__set_load_sql(ctx->sql_str);
}
request.__set_loadId(ctx->id.to_thrift());
request.__set_label(ctx->label);
if (ctx->group_commit) {
Expand Down Expand Up @@ -330,7 +334,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/http_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class HttpStreamAction : public HttpHandler {

void on_chunk_data(HttpRequest* req) override;
void free_handler_ctx(std::shared_ptr<void> ctx) override;
Status process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);

private:
Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _handle(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/wal_dirs_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ size_t WalDirsInfo::get_max_available_size() {

Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) {
for (const auto& wal_dir_info : _wal_dirs_info_vec) {
LOG(INFO) << "wal_dir_info:" << wal_dir_info->get_wal_dir();
if (wal_dir_info->get_wal_dir() == wal_dir) {
return wal_dir_info->update_wal_dir_limit(limit);
}
Expand Down
41 changes: 41 additions & 0 deletions be/src/olap/wal_info.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/wal_info.h"
namespace doris {
WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: pass by value and use std::move [modernize-pass-by-value]

be/src/olap/wal_info.cpp:18:

- namespace doris {
+ 
+ #include <utility>
+ namespace doris {

be/src/olap/wal_info.cpp:21:

-           _wal_path(wal_path),
+           _wal_path(std::move(wal_path)),

: _wal_id(wal_id),
_wal_path(wal_path),
_retry_num(retry_num),
_start_time_ms(start_time_ms) {}
WalInfo::~WalInfo() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: use '= default' to define a trivial destructor [modernize-use-equals-default]

Suggested change
WalInfo::~WalInfo() {}
WalInfo::~WalInfo() = default;

int64_t WalInfo::get_wal_id() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_wal_id' can be made const [readability-make-member-function-const]

Suggested change
int64_t WalInfo::get_wal_id() {
int64_t WalInfo::get_wal_id() const {

be/src/olap/wal_info.h:24:

-     int64_t get_wal_id();
+     int64_t get_wal_id() const;

return _wal_id;
}
std::string WalInfo::get_wal_path() {
return _wal_path;
}
int64_t WalInfo::get_retry_num() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_retry_num' can be made const [readability-make-member-function-const]

Suggested change
int64_t WalInfo::get_retry_num() {
int64_t WalInfo::get_retry_num() const {

be/src/olap/wal_info.h:24:

-     int64_t get_retry_num();
+     int64_t get_retry_num() const;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_retry_num' can be made const [readability-make-member-function-const]

Suggested change
int64_t WalInfo::get_retry_num() {
int64_t WalInfo::get_retry_num() const {

be/src/olap/wal_info.h:25:

-     int64_t get_retry_num();
+     int64_t get_retry_num() const;

return _retry_num;
}
int64_t WalInfo::get_start_time_ms() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_start_time_ms' can be made const [readability-make-member-function-const]

Suggested change
int64_t WalInfo::get_start_time_ms() {
int64_t WalInfo::get_start_time_ms() const {

be/src/olap/wal_info.h:25:

-     int64_t get_start_time_ms();
+     int64_t get_start_time_ms() const;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_start_time_ms' can be made const [readability-make-member-function-const]

Suggested change
int64_t WalInfo::get_start_time_ms() {
int64_t WalInfo::get_start_time_ms() const {

be/src/olap/wal_info.h:26:

-     int64_t get_start_time_ms();
+     int64_t get_start_time_ms() const;

return _start_time_ms;
}
void WalInfo::add_retry_num() {
_retry_num++;
}
} // namespace doris
38 changes: 38 additions & 0 deletions be/src/olap/wal_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "runtime/exec_env.h"

namespace doris {
class WalInfo {
public:
WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms);
~WalInfo();
int64_t get_wal_id();
int64_t get_retry_num();
int64_t get_start_time_ms();
std::string get_wal_path();
void add_retry_num();

private:
int64_t _wal_id;
std::string _wal_path;
int64_t _retry_num;
int64_t _start_time_ms;
};

} // namespace doris
41 changes: 19 additions & 22 deletions be/src/olap/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ Status WalManager::init() {
RETURN_IF_ERROR(_init_wal_dirs_conf());
RETURN_IF_ERROR(_init_wal_dirs());
RETURN_IF_ERROR(_init_wal_dirs_info());
for (auto wal_dir : _wal_dirs) {
RETURN_IF_ERROR(scan_wals(wal_dir));
}
return Thread::create(
"WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); },
&_replay_thread);
Expand Down Expand Up @@ -112,7 +115,7 @@ Status WalManager::_init_wal_dirs_conf() {
Status WalManager::_init_wal_dirs() {
bool exists = false;
for (auto wal_dir : _wal_dirs) {
std::string tmp_dir = wal_dir + "/tmp";
std::string tmp_dir = wal_dir + "/" + tmp;
LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir, &exists));
if (!exists) {
Expand All @@ -122,7 +125,6 @@ Status WalManager::_init_wal_dirs() {
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir));
}
RETURN_IF_ERROR(scan_wals(wal_dir));
}
return Status::OK();
}
Expand Down Expand Up @@ -164,15 +166,15 @@ Status WalManager::_init_wal_dirs_info() {
&_update_wal_dirs_info_thread);
}

void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) {
void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'add_wal_status_queue' can be made static [readability-convert-member-functions-to-static]

be/src/olap/wal_manager.h:73:

-     void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status);
+     static void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status);

std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
LOG(INFO) << "add wal queue "
<< ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status;
auto it = _wal_status_queues.find(table_id);
if (it == _wal_status_queues.end()) {
std::unordered_map<int64_t, WAL_STATUS> tmp;
tmp.emplace(wal_id, wal_status);
_wal_status_queues.emplace(table_id, tmp);
std::unordered_map<int64_t, WalStatus> tmp_map;
tmp_map.emplace(wal_id, wal_status);
_wal_status_queues.emplace(table_id, tmp_map);
} else {
it->second.emplace(wal_id, wal_status);
}
Expand Down Expand Up @@ -305,12 +307,12 @@ Status WalManager::scan_wals(const std::string& wal_path) {
LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" << st.to_string();
return st;
}
for (const auto& db_id : dbs) {
if (db_id.is_file) {
for (const auto& database_id : dbs) {
if (database_id.is_file || database_id.file_name == tmp) {
continue;
}
std::vector<io::FileInfo> tables;
auto db_path = wal_path + "/" + db_id.file_name;
auto db_path = wal_path + "/" + database_id.file_name;
st = io::global_local_filesystem()->list(db_path, false, &tables, &exists);
if (!st.ok()) {
LOG(WARNING) << "Failed list files for dir=" << db_path << ", st=" << st.to_string();
Expand Down Expand Up @@ -342,20 +344,16 @@ Status WalManager::scan_wals(const std::string& wal_path) {
int64_t wal_id =
std::strtoll(wal.file_name.substr(0, pos).c_str(), NULL, 10);
_wal_path_map.emplace(wal_id, wal_file);
int64_t db_id = std::strtoll(database_id.file_name.c_str(), NULL, 10);
int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10);
add_wal_status_queue(tb_id, wal_id, WalManager::WAL_STATUS::REPLAY);
add_wal_status_queue(tb_id, wal_id, WalManager::WalStatus::REPLAY);
RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id, wal_file));
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
}
}
st = add_recover_wal(std::stoll(db_id.file_name), std::stoll(table_id.file_name), res);
count += res.size();
if (!st.ok()) {
LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name
<< ", table=" << table_id.file_name << ", st=" << st.to_string();
return st;
}
}
}
LOG(INFO) << "Finish list all wals, size:" << count;
Expand Down Expand Up @@ -396,7 +394,8 @@ Status WalManager::replay() {
return Status::OK();
}

Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector<std::string> wals) {
Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id,
std::string wal) {
std::lock_guard<std::shared_mutex> wrlock(_lock);
std::shared_ptr<WalTable> table_ptr;
auto it = _table_map.find(table_id);
Expand All @@ -406,12 +405,10 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector<
} else {
table_ptr = it->second;
}
table_ptr->add_wals(wals);
table_ptr->add_wal(wal_id, wal);
#ifndef BE_TEST
for (auto wal : wals) {
RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
}
RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
#endif
return Status::OK();
}
Expand Down
17 changes: 12 additions & 5 deletions be/src/olap/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class WalManager {
ENABLE_FACTORY_CREATOR(WalManager);

public:
enum WAL_STATUS {
enum WalStatus {
PREPARE = 0,
REPLAY,
CREATE,
Expand All @@ -56,6 +56,7 @@ class WalManager {
public:
WalManager(ExecEnv* exec_env, const std::string& wal_dir);
~WalManager();
// used for wal
Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
Status init();
Status scan_wals(const std::string& wal_path);
Expand All @@ -64,13 +65,13 @@ class WalManager {
Status create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>& wal_writer);
Status scan();
size_t get_wal_table_size(int64_t table_id);
Status add_recover_wal(int64_t db_id, int64_t table_id, std::vector<std::string> wals);
Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id, std::string wal);
Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label,
std::string& base_path);
Status get_wal_path(int64_t wal_id, std::string& wal_path);
Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
PGetWalQueueSizeResponse* response);
void add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status);
void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status);
Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
void print_wal_status_queue();
void stop();
Expand All @@ -80,6 +81,7 @@ class WalManager {
void erase_wal_column_index(int64_t wal_id);
Status get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);

// used for limit
Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1);
Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1);
Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t pre_allocated,
Expand All @@ -88,6 +90,7 @@ class WalManager {
size_t get_max_available_size();

private:
// used for limit
Status _init_wal_dirs_conf();
Status _init_wal_dirs();
Status _init_wal_dirs_info();
Expand All @@ -99,23 +102,27 @@ class WalManager {
// used for be ut
size_t wal_limit_test_bytes;

const std::string tmp = "tmp";

private:
//used for wal
ExecEnv* _exec_env = nullptr;
std::shared_mutex _lock;
scoped_refptr<Thread> _replay_thread;
scoped_refptr<Thread> _update_wal_dirs_info_thread;
CountDownLatch _stop_background_threads_latch;
std::map<int64_t, std::shared_ptr<WalTable>> _table_map;
std::vector<std::string> _wal_dirs;
std::shared_mutex _wal_lock;
std::shared_mutex _wal_status_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
std::unordered_map<int64_t, std::shared_ptr<WalWriter>> _wal_id_to_writer_map;
std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>> _wal_status_queues;
std::unordered_map<int64_t, std::unordered_map<int64_t, WalStatus>> _wal_status_queues;
std::atomic<bool> _stop;
std::shared_mutex _wal_column_id_map_lock;
std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
std::unique_ptr<doris::ThreadPool> _thread_pool;
// used for limit
scoped_refptr<Thread> _update_wal_dirs_info_thread;
std::unique_ptr<WalDirsInfo> _wal_dirs_info;
};
} // namespace doris
Loading
Loading