Skip to content

Commit

Permalink
[enhance](fs) Eliminate the reference of FileReader/FileWriter to FS (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
platoneko authored Mar 21, 2024
1 parent 801f0ef commit 425cb6b
Show file tree
Hide file tree
Showing 98 changed files with 1,727 additions and 1,749 deletions.
162 changes: 105 additions & 57 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_system.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
Expand Down Expand Up @@ -1076,10 +1077,16 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
}
request.__isset.storage_policy = true;
auto& resource_list = request.resource;
for (auto [id, version] : get_storage_resource_ids()) {
for (auto [id_str, version] : get_storage_resource_ids()) {
auto& resource = resource_list.emplace_back();
resource.__set_id(id);
resource.__set_version(version);
int64_t id = -1;
if (auto [_, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id);
ec == std::errc {}) [[unlikely]] {
LOG(ERROR) << "invalid resource id format: " << id_str;
} else {
resource.__set_id(id);
resource.__set_version(version);
}
}
request.__isset.resource = true;

Expand Down Expand Up @@ -1327,77 +1334,118 @@ void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskReq
}
}

namespace {

void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
Status st;
io::RemoteFileSystemSPtr fs;

if (!existed_fs) {
// No such FS instance on BE
S3Conf s3_conf {
.bucket = param.s3_storage_param.bucket,
.prefix = param.s3_storage_param.root_path,
.client_conf = {
.endpoint = param.s3_storage_param.endpoint,
.region = param.s3_storage_param.region,
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.max_connections = param.s3_storage_param.max_conn,
.request_timeout_ms = param.s3_storage_param.request_timeout_ms,
.connect_timeout_ms = param.s3_storage_param.conn_timeout_ms,
// When using cold heat separation in minio, user might use ip address directly,
// which needs enable use_virtual_addressing to true
.use_virtual_addressing = !param.s3_storage_param.use_path_style,
}};
auto res = io::S3FileSystem::create(std::move(s3_conf), std::to_string(param.id));
if (!res.has_value()) {
st = std::move(res).error();
} else {
fs = std::move(res).value();
}
} else {
DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name;
auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
S3ClientConf conf {
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
};
st = client->reset(conf);
fs = std::move(existed_fs);
}

if (!st.ok()) {
LOG(WARNING) << "update s3 resource failed: " << st;
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", param.id)
.tag("resource_name", param.name);
put_storage_resource(param.id, {std::move(fs), param.version});
}
}

void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
Status st;
io::RemoteFileSystemSPtr fs;

if (!existed_fs) {
// No such FS instance on BE
auto res = io::HdfsFileSystem::create(param.hdfs_storage_param,
param.hdfs_storage_param.fs_name,
std::to_string(param.id), nullptr);
if (!res.has_value()) {
st = std::move(res).error();
} else {
fs = std::move(res).value();
}

} else {
DCHECK_EQ(existed_fs->type(), io::FileSystemType::HDFS) << param.id << ' ' << param.name;
// TODO(plat1ko): update hdfs conf
fs = std::move(existed_fs);
}

if (!st.ok()) {
LOG(WARNING) << "update hdfs resource failed: " << st;
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", param.id)
.tag("resource_name", param.name);
put_storage_resource(param.id, {std::move(fs), param.version});
}
}

} // namespace

void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const auto& push_storage_policy_req = req.push_storage_policy_req;
// refresh resource
for (auto& resource : push_storage_policy_req.resource) {
auto existed_resource = get_storage_resource(resource.id);
if (existed_resource.version >= resource.version) {
for (auto&& param : push_storage_policy_req.resource) {
auto existed_resource = get_storage_resource(param.id);
if (existed_resource.version >= param.version) {
// Stale request, ignore
continue;
}
if (resource.__isset.s3_storage_param) {
Status st;
S3Conf s3_conf;
s3_conf.ak = std::move(resource.s3_storage_param.ak);
s3_conf.sk = std::move(resource.s3_storage_param.sk);
s3_conf.endpoint = std::move(resource.s3_storage_param.endpoint);
s3_conf.region = std::move(resource.s3_storage_param.region);
s3_conf.prefix = std::move(resource.s3_storage_param.root_path);
s3_conf.bucket = std::move(resource.s3_storage_param.bucket);
s3_conf.connect_timeout_ms = resource.s3_storage_param.conn_timeout_ms;
s3_conf.max_connections = resource.s3_storage_param.max_conn;
s3_conf.request_timeout_ms = resource.s3_storage_param.request_timeout_ms;
// When using cold heat separation in minio, user might use ip address directly,
// which needs enable use_virtual_addressing to true
s3_conf.use_virtual_addressing = !resource.s3_storage_param.use_path_style;
std::shared_ptr<io::S3FileSystem> fs;
if (existed_resource.fs == nullptr) {
st = io::S3FileSystem::create(s3_conf, std::to_string(resource.id), &fs);
} else {
fs = std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs);
fs->set_conf(s3_conf);
}
if (!st.ok()) {
LOG(WARNING) << "update s3 resource failed: " << st;
} else {
LOG_INFO("successfully update s3 resource")
.tag("resource_id", resource.id)
.tag("resource_name", resource.name)
.tag("s3_conf", s3_conf.to_string());
put_storage_resource(resource.id, {std::move(fs), resource.version});
}
} else if (resource.__isset.hdfs_storage_param) {
Status st;
std::shared_ptr<io::HdfsFileSystem> fs;
if (existed_resource.fs == nullptr) {
st = io::HdfsFileSystem::create(resource.hdfs_storage_param,
std::to_string(resource.id), "", nullptr, &fs);
} else {
fs = std::static_pointer_cast<io::HdfsFileSystem>(existed_resource.fs);
}
if (!st.ok()) {
LOG(WARNING) << "update hdfs resource failed: " << st;
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", resource.id)
.tag("resource_name", resource.name);
put_storage_resource(resource.id, {std::move(fs), resource.version});
}

if (param.__isset.s3_storage_param) {
update_s3_resource(param, std::move(existed_resource.fs));
} else if (param.__isset.hdfs_storage_param) {
update_hdfs_resource(param, std::move(existed_resource.fs));
} else {
LOG(WARNING) << "unknown resource=" << resource;
LOG(WARNING) << "unknown resource=" << param;
}
}
// drop storage policy
for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
delete_storage_policy(policy_id);
}
// refresh storage policy
for (auto& storage_policy : push_storage_policy_req.storage_policy) {
for (auto&& storage_policy : push_storage_policy_req.storage_policy) {
auto existed_storage_policy = get_storage_policy(storage_policy.id);
if (existed_storage_policy == nullptr ||
existed_storage_policy->version < storage_policy.version) {
auto storage_policy1 = std::make_shared<StoragePolicy>();
storage_policy1->name = std::move(storage_policy.name);
storage_policy1->name = storage_policy.name;
storage_policy1->version = storage_policy.version;
storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime;
storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
Expand Down
51 changes: 26 additions & 25 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,6 @@
namespace doris::cloud {
using namespace ErrorCode;

namespace {
constexpr int kBrpcRetryTimes = 3;

static bvar::LatencyRecorder _get_rowset_latency("doris_CloudMetaMgr", "get_rowset");
static bvar::LatencyRecorder g_cloud_commit_txn_resp_redirect_latency(
"cloud_table_stats_report_latency");
} // namespace

Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency) {
if (tasks.empty()) {
return Status::OK();
Expand Down Expand Up @@ -129,6 +121,12 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int
return status;
}

namespace {
constexpr int kBrpcRetryTimes = 3;

bvar::LatencyRecorder _get_rowset_latency("doris_CloudMetaMgr", "get_rowset");
bvar::LatencyRecorder g_cloud_commit_txn_resp_redirect_latency("cloud_table_stats_report_latency");

class MetaServiceProxy {
public:
static Status get_client(std::shared_ptr<MetaService_Stub>* stub) {
Expand Down Expand Up @@ -293,7 +291,7 @@ static std::string debug_info(const Request& req) {
}
}

static inline std::default_random_engine make_random_engine() {
inline std::default_random_engine make_random_engine() {
return std::default_random_engine(
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
}
Expand All @@ -304,8 +302,8 @@ using MetaServiceMethod = void (MetaService_Stub::*)(::google::protobuf::RpcCont
::google::protobuf::Closure*);

template <typename Request, typename Response>
static Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
MetaServiceMethod<Request, Response> method) {
Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
MetaServiceMethod<Request, Response> method) {
static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);

Expand Down Expand Up @@ -346,6 +344,8 @@ static Status retry_rpc(std::string_view op_name, const Request& req, Response*
return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
}

} // namespace

Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) {
VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id;
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id,
Expand Down Expand Up @@ -545,10 +545,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
}
}

Status CloudMetaMgr::sync_tablet_delete_bitmap(
CloudTablet* tablet, int64_t old_max_version,
const google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>& rs_metas,
const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap) {
Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
const TabletStatsPB& stats, const TabletIndexPB& idx,
DeleteBitmap* delete_bitmap) {
if (rs_metas.empty()) {
return Status::OK();
}
Expand Down Expand Up @@ -813,16 +813,17 @@ Status CloudMetaMgr::get_storage_vault_info(
}

for (const auto& obj_store : resp.obj_info()) {
S3Conf s3_conf;
s3_conf.ak = obj_store.ak();
s3_conf.sk = obj_store.sk();
s3_conf.endpoint = obj_store.endpoint();
s3_conf.region = obj_store.region();
s3_conf.bucket = obj_store.bucket();
s3_conf.prefix = obj_store.prefix();
s3_conf.sse_enabled = obj_store.sse_enabled();
s3_conf.provider = obj_store.provider();
vault_infos->emplace_back(obj_store.id(), std::move(s3_conf));
vault_infos->emplace_back(obj_store.id(),
S3Conf {
.bucket = obj_store.bucket(),
.prefix = obj_store.prefix(),
.client_conf {.endpoint = obj_store.endpoint(),
.region = obj_store.region(),
.ak = obj_store.ak(),
.sk = obj_store.sk()},
.sse_enabled = obj_store.sse_enabled(),
.provider = obj_store.provider(),
});
}
for (const auto& vault : resp.storage_vault()) {
THdfsParams params;
Expand Down
7 changes: 3 additions & 4 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ class CloudMetaMgr {
int64_t initiator);

private:
Status sync_tablet_delete_bitmap(
CloudTablet* tablet, int64_t old_max_version,
const google::protobuf::RepeatedPtrField<RowsetMetaCloudPB>& rs_metas,
const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap* delete_bitmap);
Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas, const TabletStatsPB& stats,
const TabletIndexPB& idx, DeleteBitmap* delete_bitmap);
};

} // namespace cloud
Expand Down
Loading

0 comments on commit 425cb6b

Please sign in to comment.