Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvebs/chunkserver: optimize sync chunk when write in bulk #1912

Merged
merged 1 commit into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions conf/chunkserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ copyset.raft_snapshot_uri=curve://./0/copysets # __CURVEADM_TEMPLATE__ curve://
copyset.recycler_uri=local://./0/recycler # __CURVEADM_TEMPLATE__ local://${prefix}/data/recycler __CURVEADM_TEMPLATE__
# chunkserver启动时,copyset并发加载的阈值,为0则表示不做限制
copyset.load_concurrency=10
# chunkserver use how many threads to use copyset complete sync.
copyset.sync_concurrency=20
# 检查copyset是否加载完成出现异常时的最大重试次数
copyset.check_retrytimes=3
# 当前peer的applied_index与leader上的committed_index差距小于该值
Expand All @@ -107,8 +109,12 @@ copyset.scan_rpc_retry_times=3
copyset.scan_rpc_retry_interval_us=100000
# enable O_DSYNC when open chunkfile
copyset.enable_odsync_when_open_chunkfile=false
# sync timer timeout interval
copyset.synctimer_interval_ms=30000
# sync trigger seconds
copyset.sync_trigger_seconds=25
# sync chunk limit default = 2MB
copyset.sync_chunk_limits=2097152
# 30s if the sum of write > sync_threshold, let the sync_chunk_limits doubled.
copyset.sync_threshold=65536
# check syncing interval
copyset.check_syncing_interval_ms=500

Expand Down
10 changes: 8 additions & 2 deletions conf/chunkserver.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ copyset.raft_snapshot_uri=curve://./0/copysets
copyset.recycler_uri=local://./0/recycler
# chunkserver启动时,copyset并发加载的阈值,为0则表示不做限制
copyset.load_concurrency=10
# chunkserver use how many threads to use copyset complete sync.
copyset.sync_concurrency=20
# 检查copyset是否加载完成出现异常时的最大重试次数
copyset.check_retrytimes=3
# 当前peer的applied_index与leader上的committed_index差距小于该值
Expand All @@ -107,8 +109,12 @@ copyset.scan_rpc_retry_times=3
copyset.scan_rpc_retry_interval_us=100000
# enable O_DSYNC when open chunkfile
copyset.enable_odsync_when_open_chunkfile=false
# sync timer timeout interval
copyset.synctimer_interval_ms=30000
# sync trigger seconds
copyset.sync_trigger_seconds=25
# sync chunk limit default = 2MB
copyset.sync_chunk_limits=2097152
# 30s if the sum of write > sync_threshold, let the sync_chunk_limits doubled.
copyset.sync_threshold=65536
# check syncing interval
copyset.check_syncing_interval_ms=500

Expand Down
11 changes: 9 additions & 2 deletions src/chunkserver/chunkserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "src/chunkserver/braft_cli_service.h"
#include "src/chunkserver/braft_cli_service2.h"
#include "src/chunkserver/chunkserver_helper.h"
#include "src/common/concurrent/task_thread_pool.h"
#include "src/common/uri_parser.h"
#include "src/chunkserver/raftsnapshot/curve_snapshot_attachment.h"
#include "src/chunkserver/raftsnapshot/curve_file_service.h"
Expand Down Expand Up @@ -587,15 +588,21 @@ void ChunkServer::InitCopysetNodeOptions(
&copysetNodeOptions->finishLoadMargin));
LOG_IF(FATAL, !conf->GetUInt32Value("copyset.check_loadmargin_interval_ms",
&copysetNodeOptions->checkLoadMarginIntervalMs));
LOG_IF(FATAL, !conf->GetUInt32Value("copyset.sync_concurrency",
legionxiong marked this conversation as resolved.
Show resolved Hide resolved
&copysetNodeOptions->syncConcurrency));

LOG_IF(FATAL, !conf->GetBoolValue(
"copyset.enable_odsync_when_open_chunkfile",
&copysetNodeOptions->enableOdsyncWhenOpenChunkFile));
if (!copysetNodeOptions->enableOdsyncWhenOpenChunkFile) {
LOG_IF(FATAL, !conf->GetUInt32Value("copyset.synctimer_interval_ms",
&copysetNodeOptions->syncTimerIntervalMs));
LOG_IF(FATAL, !conf->GetUInt64Value("copyset.sync_chunk_limits",
&copysetNodeOptions->syncChunkLimit));
LOG_IF(FATAL, !conf->GetUInt64Value("copyset.sync_threshold",
&copysetNodeOptions->syncThreshold));
LOG_IF(FATAL, !conf->GetUInt32Value("copyset.check_syncing_interval_ms",
&copysetNodeOptions->checkSyncingIntervalMs));
LOG_IF(FATAL, !conf->GetUInt32Value("copyset.sync_trigger_seconds",
&copysetNodeOptions->syncTriggerSeconds));
}
}

Expand Down
10 changes: 8 additions & 2 deletions src/chunkserver/config_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ struct CopysetNodeOptions {

// 限制chunkserver启动时copyset并发恢复加载的数量,为0表示不限制
uint32_t loadConcurrency = 0;
// chunkserver sync_thread_pool number of threads.
uint32_t syncConcurrency = 20;
// copyset trigger sync timeout
uint32_t syncTriggerSeconds = 25;
// 检查copyset是否加载完成出现异常时的最大重试次数
// 可能的异常:1.当前大多数副本还没起来;2.网络问题等导致无法获取leader
// 3.其他的原因导致无法获取到leader的committed index
Expand All @@ -123,8 +127,10 @@ struct CopysetNodeOptions {

// enable O_DSYNC when open chunkfile
bool enableOdsyncWhenOpenChunkFile = false;
// sync timer timeout interval
uint32_t syncTimerIntervalMs = 30000u;
// syncChunkLimit default limit
uint64_t syncChunkLimit = 2 * 1024 * 1024;
// syncHighChunkLimit default limit = 64k
uint64_t syncThreshold = 64 * 1024;
// check syncing interval
uint32_t checkSyncingIntervalMs = 500u;

Expand Down
73 changes: 49 additions & 24 deletions src/chunkserver/copyset_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@
#include <future>
#include <deque>
#include <set>
#include <chrono>
#include <condition_variable>

#include "src/chunkserver/raftsnapshot/curve_filesystem_adaptor.h"
#include "src/chunkserver/chunk_closure.h"
#include "src/chunkserver/op_request.h"
#include "src/common/concurrent/task_thread_pool.h"
#include "src/fs/fs_common.h"
#include "src/chunkserver/copyset_node_manager.h"
#include "src/chunkserver/datastore/define.h"
Expand All @@ -54,6 +57,10 @@ using curve::fs::FileSystemInfo;

const char *kCurveConfEpochFilename = "conf.epoch";

uint32_t CopysetNode::syncTriggerSeconds_ = 25;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use global variables?

std::shared_ptr<common::TaskThreadPool<>>
CopysetNode::copysetSyncPool_ = nullptr;

CopysetNode::CopysetNode(const LogicPoolID &logicPoolId,
const CopysetID &copysetId,
const Configuration &initConf) :
Expand All @@ -73,7 +80,6 @@ CopysetNode::CopysetNode(const LogicPoolID &logicPoolId,
lastSnapshotIndex_(0),
configChange_(std::make_shared<ConfigurationChange>()),
enableOdsyncWhenOpenChunkFile_(false),
syncTimerIntervalMs_(30000),
isSyncing_(false),
checkSyncingIntervalMs_(500) {
}
Expand Down Expand Up @@ -134,6 +140,16 @@ int CopysetNode::Init(const CopysetNodeOptions &options) {
<< "Copyset: " << GroupIdString();
return -1;
}
enableOdsyncWhenOpenChunkFile_ = options.enableOdsyncWhenOpenChunkFile;
if (!enableOdsyncWhenOpenChunkFile_) {
syncThread_.Init(this);
dataStore_->SetCacheCondPtr(syncThread_.cond_);
dataStore_->SetCacheLimits(options.syncChunkLimit,
options.syncThreshold);
LOG(INFO) << "init sync thread success limit = "
<< options.syncChunkLimit <<
"syncthreshold = " << options.syncThreshold;
}

recyclerUri_ = options.recyclerUri;

Expand Down Expand Up @@ -180,9 +196,7 @@ int CopysetNode::Init(const CopysetNodeOptions &options) {
// without using global variables.
StoreOptForCurveSegmentLogStorage(lsOptions);

syncTimerIntervalMs_ = options.syncTimerIntervalMs;
checkSyncingIntervalMs_ = options.checkSyncingIntervalMs;
enableOdsyncWhenOpenChunkFile_ = options.enableOdsyncWhenOpenChunkFile;

return 0;
}
Expand All @@ -194,14 +208,7 @@ int CopysetNode::Run() {
<< "Copyset: " << GroupIdString();
return -1;
}

if (!enableOdsyncWhenOpenChunkFile_) {
CHECK_EQ(0, syncTimer_.init(this, syncTimerIntervalMs_));
LOG(INFO) << "Init sync timer success, interval = "
<< syncTimerIntervalMs_;

syncTimer_.start();
}
syncThread_.Run();

LOG(INFO) << "Run copyset success."
<< "Copyset: " << GroupIdString();
Expand All @@ -210,7 +217,7 @@ int CopysetNode::Run() {

void CopysetNode::Fini() {
if (!enableOdsyncWhenOpenChunkFile_) {
syncTimer_.destroy();
syncThread_.Stop();
}

WaitSnapshotDone();
Expand Down Expand Up @@ -1007,32 +1014,50 @@ void CopysetNode::SyncAllChunks() {
curve::common::LockGuard lg(chunkIdsLock_);
temp.swap(chunkIdsToSync_);
}
size_t total = temp.size();
std::set<ChunkID> chunkIds;
for (auto chunkId : temp) {
chunkIds.insert(chunkId);
}
for (ChunkID chunk : chunkIds) {
CSErrorCode r = dataStore_->SyncChunk(chunk);
if (r != CSErrorCode::Success) {
LOG(FATAL) << "Sync Chunk failed in Copyset: "
copysetSyncPool_->Enqueue([=]() {
CSErrorCode r = dataStore_->SyncChunk(chunk);
if (r != CSErrorCode::Success) {
LOG(FATAL) << "Sync Chunk failed in Copyset: "
<< GroupIdString()
<< ", chunkid: " << chunk
<< " data store return: " << r;
}
}
});
}
}

int SyncTimer::init(CopysetNode *node, int timeoutMs) {
if (RepeatedTimerTask::init(timeoutMs) != 0) {
return -1;
}
void SyncChunkThread::Init(CopysetNode* node) {
running_ = true;
node_ = node;
return 0;
cond_ = std::make_shared<std::condition_variable>();
}

void SyncChunkThread::Run() {
syncThread_ = std::thread([this](){
while (running_) {
std::unique_lock<std::mutex> lock(mtx_);
cond_->wait_for(lock,
std::chrono::seconds(CopysetNode::syncTriggerSeconds_));
node_->SyncAllChunks();
}
});
}

void SyncChunkThread::Stop() {
running_ = false;
if (syncThread_.joinable()) {
cond_->notify_one();
syncThread_.join();
}
}

void SyncTimer::run() {
node_->HandleSyncTimerOut();
SyncChunkThread::~SyncChunkThread() {
Stop();
}

} // namespace chunkserver
Expand Down
38 changes: 23 additions & 15 deletions src/chunkserver/copyset_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

#include <butil/memory/ref_counted.h>
#include <braft/repeated_timer_task.h>
#include <bthread/condition_variable.h>

#include <condition_variable>
#include <string>
#include <vector>
#include <climits>
Expand All @@ -41,6 +43,7 @@
#include "src/chunkserver/raftsnapshot/define.h"
#include "src/chunkserver/raftsnapshot/curve_snapshot_writer.h"
#include "src/common/string_util.h"
#include "src/common/concurrent/task_thread_pool.h"
#include "src/chunkserver/raft_node.h"
#include "proto/heartbeat.pb.h"
#include "proto/chunk.pb.h"
Expand All @@ -54,6 +57,7 @@ using ::google::protobuf::RpcController;
using ::google::protobuf::Closure;
using ::curve::mds::heartbeat::ConfigChangeType;
using ::curve::common::Peer;
using ::curve::common::TaskThreadPool;

class CopysetNodeManager;

Expand Down Expand Up @@ -104,18 +108,20 @@ class ConfigurationChangeDone : public braft::Closure {

class CopysetNode;

class SyncTimer : public braft::RepeatedTimerTask {
class SyncChunkThread : public curve::common::Uncopyable {
public:
SyncTimer() : node_(nullptr) {}
virtual ~SyncTimer() {}

int init(CopysetNode *node, int timeoutMs);

void run() override;

protected:
void on_destroy() override {}
CopysetNode *node_;
friend class CopysetNode;
SyncChunkThread() = default;
~SyncChunkThread();
void Run();
void Init(CopysetNode* node);
void Stop();
private:
bool running_;
std::mutex mtx_;
std::shared_ptr<std::condition_variable> cond_;
std::thread syncThread_;
CopysetNode* node_;
};

/**
Expand Down Expand Up @@ -406,6 +412,10 @@ class CopysetNode : public braft::StateMachine,
* better for test
*/
public:
// sync trigger seconds
static uint32_t syncTriggerSeconds_;
// shared to sync pool
static std::shared_ptr<TaskThreadPool<>> copysetSyncPool_;
/**
* 从文件中解析copyset配置版本信息
* @param filePath:文件路径
Expand Down Expand Up @@ -501,10 +511,8 @@ class CopysetNode : public braft::StateMachine,

// enable O_DSYNC when open file
bool enableOdsyncWhenOpenChunkFile_;
// sync chunk timer
SyncTimer syncTimer_;
// sync timer timeout interval
uint32_t syncTimerIntervalMs_;
// sync chunk thread
SyncChunkThread syncThread_;
// chunkIds need to sync
std::deque<ChunkID> chunkIdsToSync_;
// lock for chunkIdsToSync_
Expand Down
13 changes: 9 additions & 4 deletions src/chunkserver/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
#include <string>
#include <utility>

#include "src/chunkserver/config_info.h"
#include "src/chunkserver/copyset_node.h"
#include "src/common/concurrent/task_thread_pool.h"
#include "src/common/string_util.h"
#include "src/common/timeutility.h"
#include "src/chunkserver/chunk_service.h"
Expand All @@ -50,6 +53,9 @@ std::once_flag addServiceFlag;

int CopysetNodeManager::Init(const CopysetNodeOptions &copysetNodeOptions) {
copysetNodeOptions_ = copysetNodeOptions;
CopysetNode::syncTriggerSeconds_ = copysetNodeOptions.syncTriggerSeconds;
CopysetNode::copysetSyncPool_ =
std::make_shared<common::TaskThreadPool<>>();
if (copysetNodeOptions_.loadConcurrency > 0) {
copysetLoader_ = std::make_shared<TaskThreadPool<>>();
} else {
Expand All @@ -62,7 +68,8 @@ int CopysetNodeManager::Run() {
if (running_.exchange(true, std::memory_order_acq_rel)) {
return 0;
}

CopysetNode::copysetSyncPool_->Start(copysetNodeOptions_.syncConcurrency);
assert(copysetNodeOptions_.syncConcurrency > 0);
int ret = 0;
// 启动线程池
if (copysetLoader_ != nullptr) {
Expand All @@ -89,7 +96,7 @@ int CopysetNodeManager::Fini() {
return 0;
}
loadFinished_.exchange(false, std::memory_order_acq_rel);

CopysetNode::copysetSyncPool_->Stop();
if (copysetLoader_ != nullptr) {
copysetLoader_->Stop();
copysetLoader_ = nullptr;
Expand Down Expand Up @@ -317,7 +324,6 @@ bool CopysetNodeManager::CreateCopysetNode(const LogicPoolID &logicPoolId,
<< " run failed";
return false;
}

copysetNodeMap_.insert(std::pair<GroupId, std::shared_ptr<CopysetNode>>(
groupId,
copysetNode));
Expand Down Expand Up @@ -360,7 +366,6 @@ std::shared_ptr<CopysetNode> CopysetNodeManager::CreateCopysetNodeUnlocked(
<< " run failed";
return nullptr;
}

return copysetNode;
}

Expand Down
Loading