Skip to content

Commit 457d5d0

Browse files
committed
curvefs/client: add feature of warmup
1 parent 207f26b commit 457d5d0

File tree

7 files changed

+1266
-17
lines changed

7 files changed

+1266
-17
lines changed

curvefs/src/client/fuse_client.cpp

+265-6
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ using rpcclient::Cli2ClientImpl;
7575
using rpcclient::MetaCache;
7676
using common::FLAGS_enableCto;
7777

78+
static bool checkWarmupListPath(const char*, const std::string& target) {
79+
// do something to check the path
80+
LOG(INFO) << "warmupListPath: " << target;
81+
return true;
82+
}
83+
DEFINE_string(warmupListPath, "",
84+
"the path to the list of files (dirs) that need to warmup.");
85+
DEFINE_validator(warmupListPath, checkWarmupListPath);
86+
7887
CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
7988
option_ = option;
8089

@@ -122,17 +131,270 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
122131
if (ret3 != CURVEFS_ERROR::OK) {
123132
return ret3;
124133
}
125-
126134
ret3 =
127135
dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics);
128136
if (ret3 != CURVEFS_ERROR::OK) {
129137
return ret3;
130138
}
131-
139+
warmUpFile_.exist = false;
140+
bgCmdTaskStop_.store(false, std::memory_order_release);
141+
bgCmdTaskThread_ = Thread(&FuseClient::BackGroundCmdTask, this);
142+
bgCmdStop_.store(false, std::memory_order_release);
143+
bgCmdThread_ = Thread(&FuseClient::BackGroundCmd, this);
144+
FLAGS_warmupListPath = "";
145+
taskFetchMetaPool_.Start(WARMUP_THREADS);
132146
return ret3;
133147
}
134148

149+
void FuseClient::BackGroundCmd() {
150+
while (!mounted_.load(std::memory_order_acquire)) {
151+
usleep(WARMUP_CHECKINTERVAL_US);
152+
VLOG(6) << "wait mount success.";
153+
continue;
154+
}
155+
std::string preWarmUpPath = FLAGS_warmupListPath;
156+
std::string warmUpPath;
157+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
158+
warmUpPath = FLAGS_warmupListPath;
159+
if (warmUpPath == preWarmUpPath) {
160+
usleep(WARMUP_CHECKINTERVAL_US); // check interval
161+
continue;
162+
}
163+
VLOG(6) << "has new warmUp task: " << warmUpPath;
164+
preWarmUpPath = warmUpPath;
165+
PutWarmTask(warmUpPath);
166+
WarmUpRun();
167+
}
168+
return;
169+
}
170+
171+
void FuseClient::BackGroundCmdTask() {
172+
while (!mounted_.load(std::memory_order_acquire)) {
173+
usleep(WARMUP_CHECKINTERVAL_US);
174+
VLOG(6) << "wait mount success.";
175+
continue;
176+
}
177+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
178+
std::list<std::string> readAheadPaths;
179+
WaitWarmUp();
180+
while (hasWarmTask()) {
181+
std::string warmUpTask;
182+
GetarmTask(&warmUpTask);
183+
if (warmUpTask.empty()) {
184+
continue;
185+
}
186+
VLOG(6) << "warmup task is: " << warmUpTask;
187+
std::string pDelimiter = "/";
188+
char* pToken = nullptr;
189+
char* pSave = nullptr;
190+
pToken = strtok_r(const_cast<char*>(warmUpTask.c_str()),
191+
const_cast<char*>(pDelimiter.c_str()), &pSave);
192+
if (nullptr == pToken) {
193+
VLOG(3) << "warmUpTask nullptr";
194+
continue;
195+
}
196+
Dentry dentry;
197+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
198+
fsInfo_->rootinodeid(), pToken, &dentry);
199+
if (ret != CURVEFS_ERROR::OK) {
200+
if (ret != CURVEFS_ERROR::NOTEXIST) {
201+
LOG(WARNING) << "dentryManager_ get dentry fail: "
202+
<< ret << ", name: " << warmUpTask;
203+
}
204+
LOG(WARNING) << "FetchDentry error: " << ret
205+
<< ", name: " << warmUpTask;
206+
return;
207+
}
208+
if (FsFileType::TYPE_S3 != dentry.type()) {
209+
LOG(WARNING) << "not a file: " << warmUpTask
210+
<< "type is: " << dentry.type();
211+
return;
212+
}
213+
fuse_ino_t ino = dentry.inodeid();
214+
std::shared_ptr<InodeWrapper> inodeWrapper;
215+
ret = inodeManager_->GetInode(ino, inodeWrapper);
216+
if (ret != CURVEFS_ERROR::OK) {
217+
LOG(ERROR) << "inodeManager get inode fail, ret = "
218+
<< ret << ", inodeid = " << ino;
219+
return;
220+
}
221+
uint64_t len = inodeWrapper->GetLength();
222+
VLOG(9) << "ino is: " << ino << ", len is: " << len;
223+
WarmUpFileContext_t warmUpFile{ino, len, true};
224+
SetWarmUpFile(warmUpFile);
225+
}
226+
}
227+
}
228+
229+
void FuseClient::FetchDentryEnqueue(std::string file) {
230+
VLOG(6) << "FetchDentryEnqueue start: " << file;
231+
auto task = [this, file]() {
232+
LookPath(file);
233+
};
234+
taskFetchMetaPool_.Enqueue(task);
235+
}
236+
237+
void FuseClient::LookPath(std::string file) {
238+
VLOG(6) << "LookPath start: " << file;
239+
std::vector<std::string> splitPath;
240+
// remove enter, newline, blank
241+
std::string blanks("\r\n ");
242+
file.erase(0, file.find_first_not_of(blanks));
243+
file.erase(file.find_last_not_of(blanks) + 1);
244+
if (file.empty()) {
245+
VLOG(0) << "empty path";
246+
return;
247+
}
248+
bool isRoot = false;
249+
if (file == "/") {
250+
splitPath.push_back(file);
251+
isRoot = true;
252+
} else {
253+
std::string pDelimiter = "/";
254+
char* pToken = nullptr;
255+
char* pSave = nullptr;
256+
pToken = strtok_r(const_cast<char*>(file.c_str()),
257+
const_cast<char*>(pDelimiter.c_str()), &pSave);
258+
if (nullptr == pToken) {
259+
VLOG(3) << "lookpath end";
260+
return;
261+
}
262+
splitPath.push_back(pToken);
263+
while (true) {
264+
pToken = strtok_r(NULL, const_cast<char*>(
265+
pDelimiter.c_str()), &pSave);
266+
if (nullptr == pToken) {
267+
VLOG(3) << "lookpath end";
268+
break;
269+
}
270+
VLOG(9) << "pToken is:" << pToken
271+
<< "pSave:" << pSave;
272+
splitPath.push_back(pToken);
273+
}
274+
}
275+
VLOG(3) << "splitPath size is: " << splitPath.size();
276+
if (splitPath.size() == 1 && isRoot) {
277+
VLOG(9) << "i am root";
278+
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
279+
return;
280+
} else if (splitPath.size() == 1) {
281+
VLOG(9) << "parent is root: " << fsInfo_->rootinodeid()
282+
<< ", path is: " << splitPath[0];
283+
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[0]);
284+
return;
285+
} else if (splitPath.size() > 1) { // travel path
286+
VLOG(9) << "traverse path start: " << splitPath.size();
287+
std::string lastName = splitPath.back();
288+
splitPath.pop_back();
289+
fuse_ino_t ino = fsInfo_->rootinodeid();
290+
for (auto iter : splitPath) {
291+
VLOG(9) << "traverse path: " << iter
292+
<< "ino is: " << ino;
293+
Dentry dentry;
294+
std::string pathName = iter;
295+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
296+
ino, pathName, &dentry);
297+
if (ret != CURVEFS_ERROR::OK) {
298+
if (ret != CURVEFS_ERROR::NOTEXIST) {
299+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = "
300+
<< ret << ", parent inodeid = " << ino
301+
<< ", name = " << file;
302+
}
303+
VLOG(0) << "whs FetchDentry error: " << ret;
304+
return;
305+
}
306+
ino = dentry.inodeid();
307+
}
308+
this->FetchDentry(ino, lastName);
309+
VLOG(9) << "ino is: " << ino
310+
<< "lastname is: " << lastName;
311+
return;
312+
} else {
313+
VLOG(0) << "unknown path";
314+
}
315+
return;
316+
}
317+
318+
void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
319+
auto task = [this, ino]() {
320+
// resolve层层递进,获得inode
321+
this->FetchChildDentry(ino);
322+
};
323+
taskFetchMetaPool_.Enqueue(task);
324+
}
325+
326+
void FuseClient::FetchChildDentry(fuse_ino_t ino) {
327+
VLOG(9) << "FetchChildDentry start: " << ino;
328+
std::list<Dentry> dentryList;
329+
auto limit = option_.listDentryLimit;
330+
CURVEFS_ERROR ret = dentryManager_->ListDentry(
331+
ino, &dentryList, limit);
332+
if (ret != CURVEFS_ERROR::OK) {
333+
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
334+
<< ", parent = " << ino;
335+
return;
336+
}
337+
for (auto iter : dentryList) {
338+
VLOG(9) << "FetchChildDentry: " << iter.name();
339+
if (FsFileType::TYPE_S3 == iter.type()) {
340+
std::unique_lock<std::mutex> lck(fetchMtx_);
341+
readAheadFiles_.push_front(iter.inodeid());
342+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
343+
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
344+
FetchChildDentryEnqueue(iter.inodeid());
345+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
346+
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
347+
} else {
348+
VLOG(0) << "unknown type";
349+
}
350+
}
351+
return;
352+
}
353+
354+
void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
355+
VLOG(9) << "FetchDentry start: " << file
356+
<< ", ino: " << ino;
357+
Dentry dentry;
358+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
359+
if (ret != CURVEFS_ERROR::OK) {
360+
if (ret != CURVEFS_ERROR::NOTEXIST) {
361+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
362+
<< ", parent inodeid = " << ino
363+
<< ", name = " << file;
364+
}
365+
VLOG(1) << "FetchDentry error: " << ret;
366+
return;
367+
}
368+
if (FsFileType::TYPE_S3 == dentry.type()) {
369+
std::unique_lock<std::mutex> lck(fetchMtx_);
370+
readAheadFiles_.push_front(dentry.inodeid());
371+
return;
372+
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
373+
FetchChildDentryEnqueue(dentry.inodeid());
374+
VLOG(9) << "FetchDentry: " << dentry.inodeid();
375+
return;
376+
377+
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
378+
} else {
379+
VLOG(0) << "unkown, file: " << file
380+
<< ", ino: " << ino;
381+
}
382+
VLOG(9) << "FetchDentry end: " << file
383+
<< ", ino: " << ino;
384+
return;
385+
}
386+
135387
void FuseClient::UnInit() {
388+
bgCmdTaskStop_.store(true, std::memory_order_release);
389+
bgCmdStop_.store(true, std::memory_order_release);
390+
WarmUpRun();
391+
if (bgCmdTaskThread_.joinable()) {
392+
bgCmdTaskThread_.join();
393+
}
394+
if (bgCmdThread_.joinable()) {
395+
bgCmdThread_.join();
396+
}
397+
taskFetchMetaPool_.Stop();
136398
delete mdsBase_;
137399
mdsBase_ = nullptr;
138400
}
@@ -178,7 +440,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
178440
<< ", mountPoint = " << mountpoint_.ShortDebugString();
179441
return CURVEFS_ERROR::MOUNT_FAILED;
180442
}
181-
182443
inodeManager_->SetFsId(fsInfo_->fsid());
183444
dentryManager_->SetFsId(fsInfo_->fsid());
184445
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
@@ -196,7 +457,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
196457
}
197458

198459
init_ = true;
199-
460+
mounted_.store(true, std::memory_order_release);
200461
return CURVEFS_ERROR::OK;
201462
}
202463

@@ -318,7 +579,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
318579
<< ", inodeid = " << ino;
319580
return ret;
320581
}
321-
322582
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
323583
if (fi->flags & O_TRUNC) {
324584
if (fi->flags & O_WRONLY || fi->flags & O_RDWR) {
@@ -796,7 +1056,6 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
7961056
<< ", inodeid = " << ino;
7971057
return ret;
7981058
}
799-
8001059
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
8011060
Inode *inode = inodeWrapper->GetMutableInodeUnlocked();
8021061

0 commit comments

Comments
 (0)