Skip to content

Commit 7228204

Browse files
committed
curvefs/client: add feature of warmup
1 parent 207f26b commit 7228204

File tree

7 files changed

+1268
-17
lines changed

7 files changed

+1268
-17
lines changed

curvefs/src/client/fuse_client.cpp

+267-6
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ using rpcclient::Cli2ClientImpl;
7575
using rpcclient::MetaCache;
7676
using common::FLAGS_enableCto;
7777

78+
static bool checkWarmupListPath(const char*, const std::string& target) {
79+
// do something to check the path
80+
LOG(INFO) << "warmupListPath: " << target;
81+
return true;
82+
}
83+
DEFINE_string(warmupListPath, "",
84+
"the path to the list of files (dirs) that need to warmup.");
85+
DEFINE_validator(warmupListPath, checkWarmupListPath);
86+
7887
CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
7988
option_ = option;
8089

@@ -122,17 +131,272 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
122131
if (ret3 != CURVEFS_ERROR::OK) {
123132
return ret3;
124133
}
125-
126134
ret3 =
127135
dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics);
128136
if (ret3 != CURVEFS_ERROR::OK) {
129137
return ret3;
130138
}
131-
139+
warmUpFile_.exist = false;
140+
bgCmdTaskStop_.store(false, std::memory_order_release);
141+
bgCmdTaskThread_ = Thread(&FuseClient::BackGroundCmdTask, this);
142+
bgCmdStop_.store(false, std::memory_order_release);
143+
bgCmdThread_ = Thread(&FuseClient::BackGroundCmd, this);
144+
FLAGS_warmupListPath = "";
145+
taskFetchMetaPool_.Start(WARMUP_THREADS);
132146
return ret3;
133147
}
134148

149+
void FuseClient::BackGroundCmd() {
150+
while (!mounted_.load(std::memory_order_acquire)) {
151+
usleep(WARMUP_CHECKINTERVAL_US);
152+
VLOG(6) << "wait mount success.";
153+
continue;
154+
}
155+
std::string preWarmUpPath = FLAGS_warmupListPath;
156+
std::string warmUpPath;
157+
// whs test
158+
FLAGS_warmupListPath = "/whstest02";
159+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
160+
warmUpPath = FLAGS_warmupListPath;
161+
if (warmUpPath == preWarmUpPath) {
162+
usleep(WARMUP_CHECKINTERVAL_US); // check interval
163+
continue;
164+
}
165+
VLOG(6) << "has new warmUp task: " << warmUpPath;
166+
preWarmUpPath = warmUpPath;
167+
PutWarmTask(warmUpPath);
168+
WarmUpRun();
169+
}
170+
return;
171+
}
172+
173+
void FuseClient::BackGroundCmdTask() {
174+
while (!mounted_.load(std::memory_order_acquire)) {
175+
usleep(WARMUP_CHECKINTERVAL_US);
176+
VLOG(6) << "wait mount success.";
177+
continue;
178+
}
179+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
180+
std::list<std::string> readAheadPaths;
181+
WaitWarmUp();
182+
while (hasWarmTask()) {
183+
std::string warmUpTask;
184+
GetarmTask(&warmUpTask);
185+
if (warmUpTask.empty()) {
186+
continue;
187+
}
188+
VLOG(6) << "warmup task is: " << warmUpTask;
189+
std::string pDelimiter = "/";
190+
char* pToken = nullptr;
191+
char* pSave = nullptr;
192+
pToken = strtok_r(const_cast<char*>(warmUpTask.c_str()),
193+
const_cast<char*>(pDelimiter.c_str()), &pSave);
194+
if (nullptr == pToken) {
195+
VLOG(3) << "warmUpTask nullptr";
196+
continue;
197+
}
198+
Dentry dentry;
199+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
200+
fsInfo_->rootinodeid(), pToken, &dentry);
201+
if (ret != CURVEFS_ERROR::OK) {
202+
if (ret != CURVEFS_ERROR::NOTEXIST) {
203+
LOG(WARNING) << "dentryManager_ get dentry fail: "
204+
<< ret << ", name: " << warmUpTask;
205+
}
206+
LOG(WARNING) << "FetchDentry error: " << ret
207+
<< ", name: " << warmUpTask;
208+
return;
209+
}
210+
if (FsFileType::TYPE_S3 != dentry.type()) {
211+
LOG(WARNING) << "not a file: " << warmUpTask
212+
<< "type is: " << dentry.type();
213+
return;
214+
}
215+
fuse_ino_t ino = dentry.inodeid();
216+
std::shared_ptr<InodeWrapper> inodeWrapper;
217+
ret = inodeManager_->GetInode(ino, inodeWrapper);
218+
if (ret != CURVEFS_ERROR::OK) {
219+
LOG(ERROR) << "inodeManager get inode fail, ret = "
220+
<< ret << ", inodeid = " << ino;
221+
return;
222+
}
223+
uint64_t len = inodeWrapper->GetLength();
224+
VLOG(9) << "ino is: " << ino << ", len is: " << len;
225+
WarmUpFileContext_t warmUpFile{ino, len, true};
226+
SetWarmUpFile(warmUpFile);
227+
}
228+
}
229+
}
230+
231+
void FuseClient::FetchDentryEnqueue(std::string file) {
232+
VLOG(6) << "FetchDentryEnqueue start: " << file;
233+
auto task = [this, file]() {
234+
LookPath(file);
235+
};
236+
taskFetchMetaPool_.Enqueue(task);
237+
}
238+
239+
void FuseClient::LookPath(std::string file) {
240+
VLOG(6) << "LookPath start: " << file;
241+
std::vector<std::string> splitPath;
242+
// remove enter, newline, blank
243+
std::string blanks("\r\n ");
244+
file.erase(0, file.find_first_not_of(blanks));
245+
file.erase(file.find_last_not_of(blanks) + 1);
246+
if (file.empty()) {
247+
VLOG(0) << "empty path";
248+
return;
249+
}
250+
bool isRoot = false;
251+
if (file == "/") {
252+
splitPath.push_back(file);
253+
isRoot = true;
254+
} else {
255+
std::string pDelimiter = "/";
256+
char* pToken = nullptr;
257+
char* pSave = nullptr;
258+
pToken = strtok_r(const_cast<char*>(file.c_str()),
259+
const_cast<char*>(pDelimiter.c_str()), &pSave);
260+
if (nullptr == pToken) {
261+
VLOG(3) << "lookpath end";
262+
return;
263+
}
264+
splitPath.push_back(pToken);
265+
while (true) {
266+
pToken = strtok_r(NULL, const_cast<char*>(
267+
pDelimiter.c_str()), &pSave);
268+
if (nullptr == pToken) {
269+
VLOG(3) << "lookpath end";
270+
break;
271+
}
272+
VLOG(9) << "pToken is:" << pToken
273+
<< "pSave:" << pSave;
274+
splitPath.push_back(pToken);
275+
}
276+
}
277+
VLOG(3) << "splitPath size is: " << splitPath.size();
278+
if (splitPath.size() == 1 && isRoot) {
279+
VLOG(9) << "i am root";
280+
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
281+
return;
282+
} else if (splitPath.size() == 1) {
283+
VLOG(9) << "parent is root: " << fsInfo_->rootinodeid()
284+
<< ", path is: " << splitPath[0];
285+
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[0]);
286+
return;
287+
} else if (splitPath.size() > 1) { // travel path
288+
VLOG(9) << "traverse path start: " << splitPath.size();
289+
std::string lastName = splitPath.back();
290+
splitPath.pop_back();
291+
fuse_ino_t ino = fsInfo_->rootinodeid();
292+
for (auto iter : splitPath) {
293+
VLOG(9) << "traverse path: " << iter
294+
<< "ino is: " << ino;
295+
Dentry dentry;
296+
std::string pathName = iter;
297+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
298+
ino, pathName, &dentry);
299+
if (ret != CURVEFS_ERROR::OK) {
300+
if (ret != CURVEFS_ERROR::NOTEXIST) {
301+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = "
302+
<< ret << ", parent inodeid = " << ino
303+
<< ", name = " << file;
304+
}
305+
VLOG(0) << "whs FetchDentry error: " << ret;
306+
return;
307+
}
308+
ino = dentry.inodeid();
309+
}
310+
this->FetchDentry(ino, lastName);
311+
VLOG(9) << "ino is: " << ino
312+
<< "lastname is: " << lastName;
313+
return;
314+
} else {
315+
VLOG(0) << "unknown path";
316+
}
317+
return;
318+
}
319+
320+
void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
321+
auto task = [this, ino]() {
322+
// resolve层层递进,获得inode
323+
this->FetchChildDentry(ino);
324+
};
325+
taskFetchMetaPool_.Enqueue(task);
326+
}
327+
328+
void FuseClient::FetchChildDentry(fuse_ino_t ino) {
329+
VLOG(9) << "FetchChildDentry start: " << ino;
330+
std::list<Dentry> dentryList;
331+
auto limit = option_.listDentryLimit;
332+
CURVEFS_ERROR ret = dentryManager_->ListDentry(
333+
ino, &dentryList, limit);
334+
if (ret != CURVEFS_ERROR::OK) {
335+
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
336+
<< ", parent = " << ino;
337+
return;
338+
}
339+
for (auto iter : dentryList) {
340+
VLOG(9) << "FetchChildDentry: " << iter.name();
341+
if (FsFileType::TYPE_S3 == iter.type()) {
342+
std::unique_lock<std::mutex> lck(fetchMtx_);
343+
readAheadFiles_.push_front(iter.inodeid());
344+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
345+
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
346+
FetchChildDentryEnqueue(iter.inodeid());
347+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
348+
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
349+
} else {
350+
VLOG(0) << "unknown type";
351+
}
352+
}
353+
return;
354+
}
355+
356+
void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
357+
VLOG(9) << "FetchDentry start: " << file
358+
<< ", ino: " << ino;
359+
Dentry dentry;
360+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
361+
if (ret != CURVEFS_ERROR::OK) {
362+
if (ret != CURVEFS_ERROR::NOTEXIST) {
363+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
364+
<< ", parent inodeid = " << ino
365+
<< ", name = " << file;
366+
}
367+
VLOG(1) << "FetchDentry error: " << ret;
368+
return;
369+
}
370+
if (FsFileType::TYPE_S3 == dentry.type()) {
371+
std::unique_lock<std::mutex> lck(fetchMtx_);
372+
readAheadFiles_.push_front(dentry.inodeid());
373+
return;
374+
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
375+
FetchChildDentryEnqueue(dentry.inodeid());
376+
VLOG(9) << "FetchDentry: " << dentry.inodeid();
377+
return;
378+
379+
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
380+
} else {
381+
VLOG(0) << "unkown, file: " << file
382+
<< ", ino: " << ino;
383+
}
384+
VLOG(9) << "FetchDentry end: " << file
385+
<< ", ino: " << ino;
386+
return;
387+
}
388+
135389
void FuseClient::UnInit() {
390+
bgCmdTaskStop_.store(true, std::memory_order_release);
391+
bgCmdStop_.store(true, std::memory_order_release);
392+
WarmUpRun();
393+
if (bgCmdTaskThread_.joinable()) {
394+
bgCmdTaskThread_.join();
395+
}
396+
if (bgCmdThread_.joinable()) {
397+
bgCmdThread_.join();
398+
}
399+
taskFetchMetaPool_.Stop();
136400
delete mdsBase_;
137401
mdsBase_ = nullptr;
138402
}
@@ -178,7 +442,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
178442
<< ", mountPoint = " << mountpoint_.ShortDebugString();
179443
return CURVEFS_ERROR::MOUNT_FAILED;
180444
}
181-
182445
inodeManager_->SetFsId(fsInfo_->fsid());
183446
dentryManager_->SetFsId(fsInfo_->fsid());
184447
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
@@ -196,7 +459,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
196459
}
197460

198461
init_ = true;
199-
462+
mounted_.store(true, std::memory_order_release);
200463
return CURVEFS_ERROR::OK;
201464
}
202465

@@ -318,7 +581,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
318581
<< ", inodeid = " << ino;
319582
return ret;
320583
}
321-
322584
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
323585
if (fi->flags & O_TRUNC) {
324586
if (fi->flags & O_WRONLY || fi->flags & O_RDWR) {
@@ -796,7 +1058,6 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
7961058
<< ", inodeid = " << ino;
7971059
return ret;
7981060
}
799-
8001061
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
8011062
Inode *inode = inodeWrapper->GetMutableInodeUnlocked();
8021063

0 commit comments

Comments
 (0)