Skip to content

Commit

Permalink
use rcu in DiskManager (vesoft-inc#3917)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
liwenhui-soul and Sophie-Xie authored Feb 23, 2022
1 parent 0963f36 commit d3dbc84
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 40 deletions.
81 changes: 53 additions & 28 deletions src/kvstore/DiskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ DiskManager::DiskManager(const std::vector<std::string>& dataPaths,
try {
// atomic is not copy-constructible
std::vector<std::atomic_uint64_t> freeBytes(dataPaths.size() + 1);
Paths* paths = new Paths();
paths_.store(paths);
size_t index = 0;
for (const auto& path : dataPaths) {
auto absolute = boost::filesystem::absolute(path);
Expand All @@ -27,7 +29,7 @@ DiskManager::DiskManager(const std::vector<std::string>& dataPaths,
}
auto canonical = boost::filesystem::canonical(path);
auto info = boost::filesystem::space(canonical);
dataPaths_.emplace_back(std::move(canonical));
paths->dataPaths_.emplace_back(std::move(canonical));
freeBytes[index++] = info.available;
}
freeBytes_ = std::move(freeBytes);
Expand All @@ -39,21 +41,31 @@ DiskManager::DiskManager(const std::vector<std::string>& dataPaths,
}
}

StatusOr<std::vector<std::string>> DiskManager::path(GraphSpaceID spaceId) {
auto spaceIt = partPath_.find(spaceId);
if (spaceIt == partPath_.end()) {
DiskManager::~DiskManager() {
std::lock_guard<std::mutex> lg(lock_);
Paths* paths = paths_.load(std::memory_order_acquire);
folly::rcu_retire(paths, std::default_delete<Paths>());
}

StatusOr<std::vector<std::string>> DiskManager::path(GraphSpaceID spaceId) const {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
auto spaceIt = paths->partPath_.find(spaceId);
if (spaceIt == paths->partPath_.end()) {
return Status::Error("Space not found");
}
std::vector<std::string> paths;
std::vector<std::string> pathsRes;
for (const auto& partEntry : spaceIt->second) {
paths.emplace_back(partEntry.first);
pathsRes.emplace_back(partEntry.first);
}
return paths;
return pathsRes;
}

StatusOr<std::string> DiskManager::path(GraphSpaceID spaceId, PartitionID partId) {
auto spaceIt = partPath_.find(spaceId);
if (spaceIt == partPath_.end()) {
StatusOr<std::string> DiskManager::path(GraphSpaceID spaceId, PartitionID partId) const {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
auto spaceIt = paths->partPath_.find(spaceId);
if (spaceIt == paths->partPath_.end()) {
return Status::Error("Space not found");
}
for (const auto& [path, parts] : spaceIt->second) {
Expand All @@ -67,13 +79,17 @@ StatusOr<std::string> DiskManager::path(GraphSpaceID spaceId, PartitionID partId
void DiskManager::addPartToPath(GraphSpaceID spaceId, PartitionID partId, const std::string& path) {
std::lock_guard<std::mutex> lg(lock_);
try {
Paths* oldPaths = paths_.load(std::memory_order_acquire);
Paths* newPaths = new Paths(*oldPaths);
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
dataPath = boost::filesystem::absolute(dataPath);
auto iter = std::find(dataPaths_.begin(), dataPaths_.end(), dataPath);
CHECK(iter != dataPaths_.end());
partIndex_[spaceId][partId] = iter - dataPaths_.begin();
partPath_[spaceId][canonical.string()].emplace(partId);
auto iter = std::find(newPaths->dataPaths_.begin(), newPaths->dataPaths_.end(), dataPath);
CHECK(iter != newPaths->dataPaths_.end());
newPaths->partIndex_[spaceId][partId] = iter - newPaths->dataPaths_.begin();
newPaths->partPath_[spaceId][canonical.string()].emplace(partId);
paths_.store(newPaths, std::memory_order_release);
folly::rcu_retire(oldPaths, std::default_delete<Paths>());
} catch (boost::filesystem::filesystem_error& e) {
LOG(FATAL) << "Invalid path: " << e.what();
}
Expand All @@ -84,21 +100,26 @@ void DiskManager::removePartFromPath(GraphSpaceID spaceId,
const std::string& path) {
std::lock_guard<std::mutex> lg(lock_);
try {
Paths* oldPaths = paths_.load(std::memory_order_acquire);
Paths* newPaths = new Paths(*oldPaths);
auto canonical = boost::filesystem::canonical(path);
auto dataPath = canonical.parent_path().parent_path();
dataPath = boost::filesystem::absolute(dataPath);
auto iter = std::find(dataPaths_.begin(), dataPaths_.end(), dataPath);
CHECK(iter != dataPaths_.end());
partIndex_[spaceId].erase(partId);
partPath_[spaceId][canonical.string()].erase(partId);
auto iter = std::find(newPaths->dataPaths_.begin(), newPaths->dataPaths_.end(), dataPath);
CHECK(iter != newPaths->dataPaths_.end());
newPaths->partIndex_[spaceId].erase(partId);
newPaths->partPath_[spaceId][canonical.string()].erase(partId);
paths_.store(newPaths, std::memory_order_release);
folly::rcu_retire(oldPaths, std::default_delete<Paths>());
} catch (boost::filesystem::filesystem_error& e) {
LOG(FATAL) << "Invalid path: " << e.what();
}
}

void DiskManager::getDiskParts(SpaceDiskPartsMap& diskParts) {
std::lock_guard<std::mutex> lg(lock_);
for (const auto& [space, partDiskMap] : partPath_) {
void DiskManager::getDiskParts(SpaceDiskPartsMap& diskParts) const {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
for (const auto& [space, partDiskMap] : paths->partPath_) {
std::unordered_map<std::string, meta::cpp2::PartitionList> tmpPartPaths;
for (const auto& [path, partitions] : partDiskMap) {
std::vector<PartitionID> tmpPartitions;
Expand All @@ -113,9 +134,11 @@ void DiskManager::getDiskParts(SpaceDiskPartsMap& diskParts) {
}
}

bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) {
auto spaceIt = partIndex_.find(spaceId);
if (spaceIt == partIndex_.end()) {
bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) const {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
auto spaceIt = paths->partIndex_.find(spaceId);
if (spaceIt == paths->partIndex_.end()) {
return false;
}
auto partIt = spaceIt->second.find(partId);
Expand All @@ -127,14 +150,16 @@ bool DiskManager::hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) {

void DiskManager::refresh() {
// refresh the available bytes of each data path, skip the dummy path
for (size_t i = 0; i < dataPaths_.size(); i++) {
folly::rcu_reader guard;
Paths* paths = paths_.load(std::memory_order_acquire);
for (size_t i = 0; i < paths->dataPaths_.size(); i++) {
boost::system::error_code ec;
auto info = boost::filesystem::space(dataPaths_[i], ec);
auto info = boost::filesystem::space(paths->dataPaths_[i], ec);
if (!ec) {
VLOG(2) << "Refresh filesystem info of " << dataPaths_[i];
VLOG(2) << "Refresh filesystem info of " << paths->dataPaths_[i];
freeBytes_[i] = info.available;
} else {
LOG(WARNING) << "Get filesystem info of " << dataPaths_[i] << " failed";
LOG(WARNING) << "Get filesystem info of " << paths->dataPaths_[i] << " failed";
}
}
}
Expand Down
29 changes: 17 additions & 12 deletions src/kvstore/DiskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/base/StatusOr.h"
#include "common/thread/GenericWorker.h"
#include "common/thrift/ThriftTypes.h"
#include "folly/synchronization/Rcu.h"
#include "interface/gen-cpp2/meta_types.h"

namespace nebula {
Expand All @@ -41,14 +42,16 @@ class DiskManager {
DiskManager(const std::vector<std::string>& dataPaths,
std::shared_ptr<thread::GenericWorker> bgThread = nullptr);

~DiskManager();

/**
* @brief return canonical data path of given space
*
* @param spaceId
* @return StatusOr<std::vector<std::string>> Canonical path of all which contains the specified
* space, e.g. {"/DataPath1/nebula/spaceId", "/DataPath2/nebula/spaceId" ... }
*/
StatusOr<std::vector<std::string>> path(GraphSpaceID spaceId);
StatusOr<std::vector<std::string>> path(GraphSpaceID spaceId) const;

/**
* @brief Canonical path which contains the specified space and part, e.g.
Expand All @@ -60,7 +63,7 @@ class DiskManager {
* @param partId
* @return StatusOr<std::string> data path of given partId if found, else return error status
*/
StatusOr<std::string> path(GraphSpaceID spaceId, PartitionID partId);
StatusOr<std::string> path(GraphSpaceID spaceId, PartitionID partId) const;

/**
* @brief Add a partition to a given path, called when add a partiton in NebulaStore
Expand Down Expand Up @@ -90,35 +93,37 @@ class DiskManager {
* @return true Data path remains enough space
* @return false Data path does not remain enough space
*/
bool hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId);
bool hasEnoughSpace(GraphSpaceID spaceId, PartitionID partId) const;

/**
* @brief Get all partitions grouped by data path and spaceId
*
* @param diskParts Get all space data path and all partition in the path
*/
void getDiskParts(SpaceDiskPartsMap& diskParts);
void getDiskParts(SpaceDiskPartsMap& diskParts) const;

private:
/**
* @brief Refresh free bytes of data path periodically
*/
void refresh();

struct Paths {
// canonical path of data_path flag
std::vector<boost::filesystem::path> dataPaths_;
// given a space and data path, return all parts in the path
std::unordered_map<GraphSpaceID, PartDiskMap> partPath_;
// the index in dataPaths_ for a given space + part
std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, size_t>> partIndex_;
};

private:
std::shared_ptr<thread::GenericWorker> bgThread_;

// canonical path of data_path flag
std::vector<boost::filesystem::path> dataPaths_;
std::atomic<Paths*> paths_;
// free space available to a non-privileged process, in bytes
std::vector<std::atomic_uint64_t> freeBytes_;

// given a space and data path, return all parts in the path
std::unordered_map<GraphSpaceID, PartDiskMap> partPath_;

// the index in dataPaths_ for a given space + part
std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, size_t>> partIndex_;

// lock used to protect partPath_ and partIndex_
std::mutex lock_;
};
Expand Down

0 comments on commit d3dbc84

Please sign in to comment.