Skip to content

Commit c570cbd

Browse files
committed
curvefs/client: add feature of warmup
1 parent 207f26b commit c570cbd

File tree

6 files changed

+582
-8
lines changed

6 files changed

+582
-8
lines changed

curvefs/src/client/fuse_client.cpp

+220-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <set>
3232
#include <unordered_map>
3333
#include <utility>
34+
#include <boost/algorithm/string.hpp>
3435

3536
#include "curvefs/proto/mds.pb.h"
3637
#include "curvefs/src/client/fuse_common.h"
@@ -75,6 +76,15 @@ using rpcclient::Cli2ClientImpl;
7576
using rpcclient::MetaCache;
7677
using common::FLAGS_enableCto;
7778

79+
static bool checkWarmupListPath(const char*, const std::string& target) {
80+
// do something to check the path
81+
LOG(INFO) << "warmupListPath: " << target;
82+
return true;
83+
}
84+
DEFINE_string(warmupListPath, "",
85+
"the path to the list of files (dirs) that need to warmup.");
86+
DEFINE_validator(warmupListPath, checkWarmupListPath);
87+
7888
CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
7989
option_ = option;
8090

@@ -122,17 +132,224 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
122132
if (ret3 != CURVEFS_ERROR::OK) {
123133
return ret3;
124134
}
125-
126135
ret3 =
127136
dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics);
128137
if (ret3 != CURVEFS_ERROR::OK) {
129138
return ret3;
130139
}
131-
140+
warmUpFile_.exist = false;
141+
bgCmdTaskStop_.store(false, std::memory_order_release);
142+
bgCmdTaskThread_ = Thread(&FuseClient::BackGroundCmdTask, this);
143+
bgCmdStop_.store(false, std::memory_order_release);
144+
bgCmdThread_ = Thread(&FuseClient::BackGroundCmd, this);
145+
FLAGS_warmupListPath = "";
146+
taskFetchMetaPool_.Start(WARMUP_THREADS);
132147
return ret3;
133148
}
134149

150+
void FuseClient::BackGroundCmd() {
151+
std::string preWarmUpPath = FLAGS_warmupListPath;
152+
std::string warmUpPath;
153+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
154+
warmUpPath = FLAGS_warmupListPath;
155+
if (warmUpPath == preWarmUpPath) {
156+
usleep(WARMUP_CHECKINTERVAL_US); // check interval
157+
continue;
158+
}
159+
VLOG(6) << "has new warmUp task: " << warmUpPath;
160+
preWarmUpPath = warmUpPath;
161+
PutWarmTask(warmUpPath);
162+
WarmUpRun();
163+
}
164+
return;
165+
}
166+
167+
void FuseClient::BackGroundCmdTask() {
168+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
169+
std::list<std::string> readAheadPaths;
170+
WaitWarmUp();
171+
while (hasWarmTask()) {
172+
std::string warmUpTask;
173+
GetarmTask(warmUpTask);
174+
if (warmUpTask.empty()) {
175+
continue;
176+
}
177+
VLOG(6) << "warmUp task is: " << warmUpTask;
178+
std::vector<std::string> splitPath;
179+
boost::split(splitPath, warmUpTask, boost::is_any_of("/"), boost::token_compress_on);
180+
Dentry dentry;
181+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
182+
fsInfo_->rootinodeid(), splitPath[1], &dentry);
183+
if (ret != CURVEFS_ERROR::OK) {
184+
if (ret != CURVEFS_ERROR::NOTEXIST) {
185+
LOG(WARNING) << "dentryManager_ get dentry fail: "
186+
<< ret << ", name: " << warmUpTask;
187+
}
188+
VLOG(1) << "FetchDentry error: " << ret;
189+
return;
190+
}
191+
if (FsFileType::TYPE_S3 != dentry.type()) {
192+
VLOG(3) << "not a file: " << warmUpTask;
193+
return;
194+
}
195+
196+
fuse_ino_t ino = dentry.inodeid();
197+
std::shared_ptr<InodeWrapper> inodeWrapper;
198+
ret = inodeManager_->GetInode(ino, inodeWrapper);
199+
if (ret != CURVEFS_ERROR::OK) {
200+
LOG(ERROR) << "inodeManager get inode fail, ret = "
201+
<< ret << ", inodeid = " << ino;
202+
return;
203+
}
204+
uint64_t len = inodeWrapper->GetLength();
205+
VLOG(9) << "ino is: " << ino << ", len is: " << len;
206+
WarmUpFileContext_t warmUpFile{ino, len, true};
207+
SetWarmUpFile(warmUpFile);
208+
}
209+
}
210+
}
211+
212+
void FuseClient::FetchDentryEnqueue(std::string file) {
213+
VLOG(6) << "FetchDentryEnqueue start: " << file;
214+
auto task = [this, file]() {
215+
LookPath(file);
216+
};
217+
taskFetchMetaPool_.Enqueue(task);
218+
}
219+
220+
void FuseClient::LookPath(std::string file) {
221+
VLOG(6) << "LookPath start: " << file;
222+
// remove the blank
223+
boost::trim(file);
224+
std::vector<std::string> splitPath;
225+
boost::split(splitPath, file, boost::is_any_of("/"), boost::token_compress_on);
226+
if (splitPath.size() == 2
227+
&& splitPath.back().empty()) {
228+
VLOG(6) << "i am root";
229+
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
230+
return;
231+
} else if (splitPath.size() == 2) {
232+
VLOG(6) << "parent is root: " << fsInfo_->rootinodeid()
233+
<< ", path is: " << splitPath[1];
234+
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[1]);
235+
return;
236+
} else if (splitPath.size() > 2) { // travel path
237+
VLOG(6) << "traverse path size: " << splitPath.size() ;
238+
std::string lastName = splitPath.back();
239+
splitPath.pop_back();
240+
fuse_ino_t ino = fsInfo_->rootinodeid();
241+
auto iter = splitPath.begin();
242+
// the first member is always empty, so skip
243+
iter++;
244+
for (; iter != splitPath.end(); iter++) {
245+
VLOG(9) << "traverse path: " << *iter
246+
<< "ino is: " << ino;
247+
Dentry dentry;
248+
std::string pathName = *iter;
249+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, pathName, &dentry);
250+
if (ret != CURVEFS_ERROR::OK) {
251+
if (ret != CURVEFS_ERROR::NOTEXIST) {
252+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
253+
<< ", parent inodeid = " << ino
254+
<< ", name = " << file;
255+
}
256+
VLOG(1) << "FetchDentry error: " << ret;
257+
return;
258+
}
259+
ino = dentry.inodeid();
260+
}
261+
this->FetchDentry(ino, lastName);
262+
VLOG(9) << "ino is: " << ino
263+
<< "lastname is: " << lastName;
264+
return;
265+
} else {
266+
VLOG(0) << "unknown path";
267+
}
268+
return;
269+
}
270+
271+
void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
272+
auto task = [this, ino]() {
273+
// resolve层层递进,获得inode
274+
this->FetchChildDentry(ino);
275+
};
276+
taskFetchMetaPool_.Enqueue(task);
277+
}
278+
279+
void FuseClient::FetchChildDentry(fuse_ino_t ino) {
280+
VLOG(9) << "FetchChildDentry start: " << ino;
281+
std::list<Dentry> dentryList;
282+
auto limit = option_.listDentryLimit;
283+
CURVEFS_ERROR ret = dentryManager_->ListDentry(
284+
ino, &dentryList, limit);
285+
if (ret != CURVEFS_ERROR::OK) {
286+
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
287+
<< ", parent = " << ino;
288+
return;
289+
}
290+
for (auto iter : dentryList) {
291+
VLOG(9) << "FetchChildDentry: " << iter.name();
292+
if (FsFileType::TYPE_S3 == iter.type()) {
293+
std::unique_lock<std::mutex> lck(fetchMtx_);
294+
readAheadFiles_.push_front(iter.inodeid());
295+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
296+
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
297+
FetchChildDentryEnqueue(iter.inodeid());
298+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
299+
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
300+
301+
} else {
302+
VLOG(0) << "unknown type";
303+
}
304+
}
305+
return;
306+
}
307+
308+
void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
309+
VLOG(9) << "FetchDentry start: " << file
310+
<< ", ino: " << ino;
311+
Dentry dentry;
312+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
313+
if (ret != CURVEFS_ERROR::OK) {
314+
if (ret != CURVEFS_ERROR::NOTEXIST) {
315+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
316+
<< ", parent inodeid = " << ino
317+
<< ", name = " << file;
318+
}
319+
VLOG(1) << "FetchDentry error: " << ret;
320+
return;
321+
}
322+
if (FsFileType::TYPE_S3 == dentry.type()) {
323+
std::unique_lock<std::mutex> lck(fetchMtx_);
324+
readAheadFiles_.push_front(dentry.inodeid());
325+
return;
326+
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
327+
FetchChildDentryEnqueue(dentry.inodeid());
328+
VLOG(9) << "FetchDentry: " << dentry.inodeid();
329+
return;
330+
331+
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()){
332+
333+
} else {
334+
VLOG(0) << "unkown, file: " << file
335+
<< ", ino: " << ino;
336+
}
337+
VLOG(9) << "FetchDentry end: " << file
338+
<< ", ino: " << ino;
339+
return;
340+
}
341+
135342
void FuseClient::UnInit() {
343+
bgCmdTaskStop_.store(true, std::memory_order_release);
344+
bgCmdStop_.store(true, std::memory_order_release);
345+
WarmUpRun();
346+
if (bgCmdTaskThread_.joinable()) {
347+
bgCmdTaskThread_.join();
348+
}
349+
if (bgCmdThread_.joinable()) {
350+
bgCmdThread_.join();
351+
}
352+
taskFetchMetaPool_.Stop();
136353
delete mdsBase_;
137354
mdsBase_ = nullptr;
138355
}
@@ -178,7 +395,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
178395
<< ", mountPoint = " << mountpoint_.ShortDebugString();
179396
return CURVEFS_ERROR::MOUNT_FAILED;
180397
}
181-
182398
inodeManager_->SetFsId(fsInfo_->fsid());
183399
dentryManager_->SetFsId(fsInfo_->fsid());
184400
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
@@ -196,7 +412,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
196412
}
197413

198414
init_ = true;
199-
415+
mounted_.store(true, std::memory_order_release);
200416
return CURVEFS_ERROR::OK;
201417
}
202418

@@ -318,7 +534,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
318534
<< ", inodeid = " << ino;
319535
return ret;
320536
}
321-
322537
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
323538
if (fi->flags & O_TRUNC) {
324539
if (fi->flags & O_WRONLY || fi->flags & O_RDWR) {

0 commit comments

Comments
 (0)