Skip to content

Commit 94bdd7b

Browse files
committed
curvefs/client: add feature of warmup
1 parent fa1e288 commit 94bdd7b

File tree

6 files changed

+582
-9
lines changed

6 files changed

+582
-9
lines changed

curvefs/src/client/fuse_client.cpp

+220-6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <set>
3232
#include <unordered_map>
3333
#include <utility>
34+
#include <boost/algorithm/string.hpp>
3435

3536
#include "curvefs/proto/mds.pb.h"
3637
#include "curvefs/src/client/fuse_common.h"
@@ -74,6 +75,15 @@ using rpcclient::Cli2ClientImpl;
7475
using rpcclient::MetaCache;
7576
using common::FLAGS_enableCto;
7677

78+
static bool checkWarmupListPath(const char*, const std::string& target) {
79+
// do something to check the path
80+
LOG(INFO) << "warmupListPath: " << target;
81+
return true;
82+
}
83+
DEFINE_string(warmupListPath, "",
84+
"the path to the list of files (dirs) that need to warmup.");
85+
DEFINE_validator(warmupListPath, checkWarmupListPath);
86+
7787
CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
7888
option_ = option;
7989

@@ -121,17 +131,224 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
121131
if (ret3 != CURVEFS_ERROR::OK) {
122132
return ret3;
123133
}
124-
125134
ret3 =
126135
dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics);
127136
if (ret3 != CURVEFS_ERROR::OK) {
128137
return ret3;
129138
}
130-
139+
warmUpFile_.exist = false;
140+
bgCmdTaskStop_.store(false, std::memory_order_release);
141+
bgCmdTaskThread_ = Thread(&FuseClient::BackGroundCmdTask, this);
142+
bgCmdStop_.store(false, std::memory_order_release);
143+
bgCmdThread_ = Thread(&FuseClient::BackGroundCmd, this);
144+
FLAGS_warmupListPath = "";
145+
taskFetchMetaPool_.Start(WARMUP_THREADS);
131146
return ret3;
132147
}
133148

149+
void FuseClient::BackGroundCmd() {
150+
std::string preWarmUpPath = FLAGS_warmupListPath;
151+
std::string warmUpPath;
152+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
153+
warmUpPath = FLAGS_warmupListPath;
154+
if (warmUpPath == preWarmUpPath) {
155+
usleep(WARMUP_CHECKINTERVAL_US); // check interval
156+
continue;
157+
}
158+
VLOG(6) << "has new warmUp task: " << warmUpPath;
159+
preWarmUpPath = warmUpPath;
160+
PutWarmTask(warmUpPath);
161+
WarmUpRun();
162+
}
163+
return;
164+
}
165+
166+
void FuseClient::BackGroundCmdTask() {
167+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
168+
std::list<std::string> readAheadPaths;
169+
WaitWarmUp();
170+
while (hasWarmTask()) {
171+
std::string warmUpTask;
172+
GetarmTask(warmUpTask);
173+
if (warmUpTask.empty()) {
174+
continue;
175+
}
176+
VLOG(6) << "warmUp task is: " << warmUpTask;
177+
std::vector<std::string> splitPath;
178+
boost::split(splitPath, warmUpTask, boost::is_any_of("/"), boost::token_compress_on);
179+
Dentry dentry;
180+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
181+
fsInfo_->rootinodeid(), splitPath[1], &dentry);
182+
if (ret != CURVEFS_ERROR::OK) {
183+
if (ret != CURVEFS_ERROR::NOTEXIST) {
184+
LOG(WARNING) << "dentryManager_ get dentry fail: "
185+
<< ret << ", name: " << warmUpTask;
186+
}
187+
VLOG(1) << "FetchDentry error: " << ret;
188+
return;
189+
}
190+
if (FsFileType::TYPE_S3 != dentry.type()) {
191+
VLOG(3) << "not a file: " << warmUpTask;
192+
return;
193+
}
194+
195+
fuse_ino_t ino = dentry.inodeid();
196+
std::shared_ptr<InodeWrapper> inodeWrapper;
197+
ret = inodeManager_->GetInode(ino, inodeWrapper);
198+
if (ret != CURVEFS_ERROR::OK) {
199+
LOG(ERROR) << "inodeManager get inode fail, ret = "
200+
<< ret << ", inodeid = " << ino;
201+
return;
202+
}
203+
uint64_t len = inodeWrapper->GetLength();
204+
VLOG(9) << "ino is: " << ino << ", len is: " << len;
205+
WarmUpFileContext_t warmUpFile{ino, len, true};
206+
SetWarmUpFile(warmUpFile);
207+
}
208+
}
209+
}
210+
211+
void FuseClient::FetchDentryEnqueue(std::string file) {
212+
VLOG(6) << "FetchDentryEnqueue start: " << file;
213+
auto task = [this, file]() {
214+
LookPath(file);
215+
};
216+
taskFetchMetaPool_.Enqueue(task);
217+
}
218+
219+
void FuseClient::LookPath(std::string file) {
220+
VLOG(6) << "LookPath start: " << file;
221+
// remove the blank
222+
boost::trim(file);
223+
std::vector<std::string> splitPath;
224+
boost::split(splitPath, file, boost::is_any_of("/"), boost::token_compress_on);
225+
if (splitPath.size() == 2
226+
&& splitPath.back().empty()) {
227+
VLOG(6) << "i am root";
228+
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
229+
return;
230+
} else if (splitPath.size() == 2) {
231+
VLOG(6) << "parent is root: " << fsInfo_->rootinodeid()
232+
<< ", path is: " << splitPath[1];
233+
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[1]);
234+
return;
235+
} else if (splitPath.size() > 2) { // travel path
236+
VLOG(6) << "traverse path size: " << splitPath.size() ;
237+
std::string lastName = splitPath.back();
238+
splitPath.pop_back();
239+
fuse_ino_t ino = fsInfo_->rootinodeid();
240+
auto iter = splitPath.begin();
241+
// the first member is always empty, so skip
242+
iter++;
243+
for (; iter != splitPath.end(); iter++) {
244+
VLOG(9) << "traverse path: " << *iter
245+
<< "ino is: " << ino;
246+
Dentry dentry;
247+
std::string pathName = *iter;
248+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, pathName, &dentry);
249+
if (ret != CURVEFS_ERROR::OK) {
250+
if (ret != CURVEFS_ERROR::NOTEXIST) {
251+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
252+
<< ", parent inodeid = " << ino
253+
<< ", name = " << file;
254+
}
255+
VLOG(1) << "FetchDentry error: " << ret;
256+
return;
257+
}
258+
ino = dentry.inodeid();
259+
}
260+
this->FetchDentry(ino, lastName);
261+
VLOG(9) << "ino is: " << ino
262+
<< "lastname is: " << lastName;
263+
return;
264+
} else {
265+
VLOG(0) << "unknown path";
266+
}
267+
return;
268+
}
269+
270+
void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
271+
auto task = [this, ino]() {
272+
// resolve层层递进,获得inode
273+
this->FetchChildDentry(ino);
274+
};
275+
taskFetchMetaPool_.Enqueue(task);
276+
}
277+
278+
void FuseClient::FetchChildDentry(fuse_ino_t ino) {
279+
VLOG(9) << "FetchChildDentry start: " << ino;
280+
std::list<Dentry> dentryList;
281+
auto limit = option_.listDentryLimit;
282+
CURVEFS_ERROR ret = dentryManager_->ListDentry(
283+
ino, &dentryList, limit);
284+
if (ret != CURVEFS_ERROR::OK) {
285+
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
286+
<< ", parent = " << ino;
287+
return;
288+
}
289+
for (auto iter : dentryList) {
290+
VLOG(9) << "FetchChildDentry: " << iter.name();
291+
if (FsFileType::TYPE_S3 == iter.type()) {
292+
std::unique_lock<std::mutex> lck(fetchMtx_);
293+
readAheadFiles_.push_front(iter.inodeid());
294+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
295+
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
296+
FetchChildDentryEnqueue(iter.inodeid());
297+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
298+
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
299+
300+
} else {
301+
VLOG(0) << "unknown type";
302+
}
303+
}
304+
return;
305+
}
306+
307+
void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
308+
VLOG(9) << "FetchDentry start: " << file
309+
<< ", ino: " << ino;
310+
Dentry dentry;
311+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
312+
if (ret != CURVEFS_ERROR::OK) {
313+
if (ret != CURVEFS_ERROR::NOTEXIST) {
314+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
315+
<< ", parent inodeid = " << ino
316+
<< ", name = " << file;
317+
}
318+
VLOG(1) << "FetchDentry error: " << ret;
319+
return;
320+
}
321+
if (FsFileType::TYPE_S3 == dentry.type()) {
322+
std::unique_lock<std::mutex> lck(fetchMtx_);
323+
readAheadFiles_.push_front(dentry.inodeid());
324+
return;
325+
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
326+
FetchChildDentryEnqueue(dentry.inodeid());
327+
VLOG(9) << "FetchDentry: " << dentry.inodeid();
328+
return;
329+
330+
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()){
331+
332+
} else {
333+
VLOG(0) << "unkown, file: " << file
334+
<< ", ino: " << ino;
335+
}
336+
VLOG(9) << "FetchDentry end: " << file
337+
<< ", ino: " << ino;
338+
return;
339+
}
340+
134341
void FuseClient::UnInit() {
342+
bgCmdTaskStop_.store(true, std::memory_order_release);
343+
bgCmdStop_.store(true, std::memory_order_release);
344+
WarmUpRun();
345+
if (bgCmdTaskThread_.joinable()) {
346+
bgCmdTaskThread_.join();
347+
}
348+
if (bgCmdThread_.joinable()) {
349+
bgCmdThread_.join();
350+
}
351+
taskFetchMetaPool_.Stop();
135352
delete mdsBase_;
136353
mdsBase_ = nullptr;
137354
}
@@ -186,7 +403,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
186403
<< ", mountPoint = " << mountpoint_.ShortDebugString();
187404
return CURVEFS_ERROR::MOUNT_FAILED;
188405
}
189-
190406
inodeManager_->SetFsId(fsInfo_->fsid());
191407
dentryManager_->SetFsId(fsInfo_->fsid());
192408
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
@@ -204,7 +420,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
204420
}
205421

206422
init_ = true;
207-
423+
mounted_.store(true, std::memory_order_release);
208424
return CURVEFS_ERROR::OK;
209425
}
210426

@@ -323,9 +539,7 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
323539
<< ", inodeid = " << ino;
324540
return ret;
325541
}
326-
327542
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
328-
329543
ret = inodeWrapper->Open();
330544
if (ret != CURVEFS_ERROR::OK) {
331545
return ret;

0 commit comments

Comments
 (0)