Skip to content

Commit 8953e23

Browse files
committed
curvefs/client: add feature of warmup
1 parent 207f26b commit 8953e23

9 files changed

+1276
-20
lines changed

curvefs/src/client/fuse_client.cpp

+246-6
Original file line numberDiff line numberDiff line change
@@ -122,17 +122,260 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
122122
if (ret3 != CURVEFS_ERROR::OK) {
123123
return ret3;
124124
}
125-
126125
ret3 =
127126
dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics);
128127
if (ret3 != CURVEFS_ERROR::OK) {
129128
return ret3;
130129
}
131-
130+
warmUpFile_.exist = false;
131+
bgCmdTaskStop_.store(false, std::memory_order_release);
132+
bgCmdTaskThread_ = Thread(&FuseClient::BackGroundCmdTask, this);
133+
bgCmdStop_.store(false, std::memory_order_release);
134+
bgCmdThread_ = Thread(&FuseClient::BackGroundCmd, this);
135+
taskFetchMetaPool_.Start(WARMUP_THREADS);
132136
return ret3;
133137
}
134138

139+
// we will use it to contorl warmtaskqueue(curd) later
140+
void FuseClient::BackGroundCmd() {
141+
while (!mounted_.load(std::memory_order_acquire)) {
142+
usleep(WARMUP_CHECKINTERVAL_US);
143+
VLOG(6) << "wait mount success.";
144+
continue;
145+
}
146+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
147+
usleep(WARMUP_CHECKINTERVAL_US); // check interval
148+
}
149+
return;
150+
}
151+
152+
void FuseClient::BackGroundCmdTask() {
153+
while (!mounted_.load(std::memory_order_acquire)) {
154+
usleep(WARMUP_CHECKINTERVAL_US);
155+
VLOG(6) << "wait mount success.";
156+
continue;
157+
}
158+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
159+
std::list<std::string> readAheadPaths;
160+
WaitWarmUp();
161+
while (hasWarmTask()) {
162+
std::string warmUpTask;
163+
GetarmTask(&warmUpTask);
164+
if (warmUpTask.empty()) {
165+
continue;
166+
}
167+
VLOG(9) << "warmup task is: " << warmUpTask;
168+
std::string pDelimiter = "/";
169+
char* pToken = nullptr;
170+
char* pSave = nullptr;
171+
pToken = strtok_r(const_cast<char*>(warmUpTask.c_str()),
172+
const_cast<char*>(pDelimiter.c_str()), &pSave);
173+
if (nullptr == pToken) {
174+
VLOG(3) << "warmUpTask nullptr";
175+
continue;
176+
}
177+
Dentry dentry;
178+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
179+
fsInfo_->rootinodeid(), pToken, &dentry);
180+
if (ret != CURVEFS_ERROR::OK) {
181+
if (ret != CURVEFS_ERROR::NOTEXIST) {
182+
LOG(WARNING) << "dentryManager_ get dentry fail: "
183+
<< ret << ", name: " << warmUpTask;
184+
}
185+
LOG(WARNING) << "FetchDentry error: " << ret
186+
<< ", name: " << warmUpTask;
187+
return;
188+
}
189+
if (FsFileType::TYPE_S3 != dentry.type()) {
190+
LOG(WARNING) << "not a file: " << warmUpTask
191+
<< "type is: " << dentry.type();
192+
return;
193+
}
194+
fuse_ino_t ino = dentry.inodeid();
195+
std::shared_ptr<InodeWrapper> inodeWrapper;
196+
ret = inodeManager_->GetInode(ino, inodeWrapper);
197+
if (ret != CURVEFS_ERROR::OK) {
198+
LOG(ERROR) << "inodeManager get inode fail, ret = "
199+
<< ret << ", inodeid = " << ino;
200+
return;
201+
}
202+
uint64_t len = inodeWrapper->GetLength();
203+
VLOG(9) << "ino is: " << ino << ", len is: " << len;
204+
WarmUpFileContext_t warmUpFile{ino, len, true};
205+
SetWarmUpFile(warmUpFile);
206+
}
207+
}
208+
}
209+
210+
void FuseClient::FetchDentryEnqueue(std::string file) {
211+
VLOG(9) << "FetchDentryEnqueue start: " << file;
212+
auto task = [this, file]() {
213+
LookPath(file);
214+
};
215+
taskFetchMetaPool_.Enqueue(task);
216+
}
217+
218+
void FuseClient::LookPath(std::string file) {
219+
VLOG(9) << "LookPath start: " << file;
220+
std::vector<std::string> splitPath;
221+
// remove enter, newline, blank
222+
std::string blanks("\r\n ");
223+
file.erase(0, file.find_first_not_of(blanks));
224+
file.erase(file.find_last_not_of(blanks) + 1);
225+
if (file.empty()) {
226+
VLOG(9) << "empty path";
227+
return;
228+
}
229+
bool isRoot = false;
230+
if (file == "/") {
231+
splitPath.push_back(file);
232+
isRoot = true;
233+
} else {
234+
std::string pDelimiter = "/";
235+
char* pToken = nullptr;
236+
char* pSave = nullptr;
237+
pToken = strtok_r(const_cast<char*>(file.c_str()),
238+
const_cast<char*>(pDelimiter.c_str()), &pSave);
239+
if (nullptr == pToken) {
240+
VLOG(3) << "lookpath end";
241+
return;
242+
}
243+
splitPath.push_back(pToken);
244+
while (true) {
245+
pToken = strtok_r(NULL, const_cast<char*>(
246+
pDelimiter.c_str()), &pSave);
247+
if (nullptr == pToken) {
248+
VLOG(3) << "lookpath end";
249+
break;
250+
}
251+
VLOG(9) << "pToken is:" << pToken
252+
<< "pSave:" << pSave;
253+
splitPath.push_back(pToken);
254+
}
255+
}
256+
VLOG(6) << "splitPath size is: " << splitPath.size();
257+
if (splitPath.size() == 1 && isRoot) {
258+
VLOG(9) << "i am root";
259+
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
260+
return;
261+
} else if (splitPath.size() == 1) {
262+
VLOG(9) << "parent is root: " << fsInfo_->rootinodeid()
263+
<< ", path is: " << splitPath[0];
264+
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[0]);
265+
return;
266+
} else if (splitPath.size() > 1) { // travel path
267+
VLOG(9) << "traverse path start: " << splitPath.size();
268+
std::string lastName = splitPath.back();
269+
splitPath.pop_back();
270+
fuse_ino_t ino = fsInfo_->rootinodeid();
271+
for (auto iter : splitPath) {
272+
VLOG(9) << "traverse path: " << iter
273+
<< "ino is: " << ino;
274+
Dentry dentry;
275+
std::string pathName = iter;
276+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
277+
ino, pathName, &dentry);
278+
if (ret != CURVEFS_ERROR::OK) {
279+
if (ret != CURVEFS_ERROR::NOTEXIST) {
280+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = "
281+
<< ret << ", parent inodeid = " << ino
282+
<< ", name = " << file;
283+
}
284+
VLOG(9) << "whs FetchDentry error: " << ret;
285+
return;
286+
}
287+
ino = dentry.inodeid();
288+
}
289+
this->FetchDentry(ino, lastName);
290+
VLOG(9) << "ino is: " << ino
291+
<< "lastname is: " << lastName;
292+
return;
293+
} else {
294+
VLOG(3) << "unknown path";
295+
}
296+
return;
297+
}
298+
299+
void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
300+
auto task = [this, ino]() {
301+
// resolve层层递进,获得inode
302+
this->FetchChildDentry(ino);
303+
};
304+
taskFetchMetaPool_.Enqueue(task);
305+
}
306+
307+
void FuseClient::FetchChildDentry(fuse_ino_t ino) {
308+
VLOG(9) << "FetchChildDentry start: " << ino;
309+
std::list<Dentry> dentryList;
310+
auto limit = option_.listDentryLimit;
311+
CURVEFS_ERROR ret = dentryManager_->ListDentry(
312+
ino, &dentryList, limit);
313+
if (ret != CURVEFS_ERROR::OK) {
314+
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
315+
<< ", parent = " << ino;
316+
return;
317+
}
318+
for (auto iter : dentryList) {
319+
VLOG(9) << "FetchChildDentry: " << iter.name();
320+
if (FsFileType::TYPE_S3 == iter.type()) {
321+
std::unique_lock<std::mutex> lck(fetchMtx_);
322+
readAheadFiles_.push_front(iter.inodeid());
323+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
324+
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
325+
FetchChildDentryEnqueue(iter.inodeid());
326+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
327+
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
328+
} else {
329+
VLOG(9) << "unknown type";
330+
}
331+
}
332+
return;
333+
}
334+
335+
void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
336+
VLOG(9) << "FetchDentry start: " << file
337+
<< ", ino: " << ino;
338+
Dentry dentry;
339+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
340+
if (ret != CURVEFS_ERROR::OK) {
341+
if (ret != CURVEFS_ERROR::NOTEXIST) {
342+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
343+
<< ", parent inodeid = " << ino
344+
<< ", name = " << file;
345+
}
346+
VLOG(1) << "FetchDentry error: " << ret;
347+
return;
348+
}
349+
if (FsFileType::TYPE_S3 == dentry.type()) {
350+
std::unique_lock<std::mutex> lck(fetchMtx_);
351+
readAheadFiles_.push_front(dentry.inodeid());
352+
return;
353+
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
354+
FetchChildDentryEnqueue(dentry.inodeid());
355+
VLOG(9) << "FetchDentry: " << dentry.inodeid();
356+
return;
357+
358+
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
359+
} else {
360+
VLOG(3) << "unkown, file: " << file
361+
<< ", ino: " << ino;
362+
}
363+
VLOG(9) << "FetchDentry end: " << file
364+
<< ", ino: " << ino;
365+
return;
366+
}
367+
135368
void FuseClient::UnInit() {
369+
bgCmdTaskStop_.store(true, std::memory_order_release);
370+
bgCmdStop_.store(true, std::memory_order_release);
371+
WarmUpRun();
372+
if (bgCmdTaskThread_.joinable()) {
373+
bgCmdTaskThread_.join();
374+
}
375+
if (bgCmdThread_.joinable()) {
376+
bgCmdThread_.join();
377+
}
378+
taskFetchMetaPool_.Stop();
136379
delete mdsBase_;
137380
mdsBase_ = nullptr;
138381
}
@@ -178,7 +421,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
178421
<< ", mountPoint = " << mountpoint_.ShortDebugString();
179422
return CURVEFS_ERROR::MOUNT_FAILED;
180423
}
181-
182424
inodeManager_->SetFsId(fsInfo_->fsid());
183425
dentryManager_->SetFsId(fsInfo_->fsid());
184426
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
@@ -196,7 +438,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
196438
}
197439

198440
init_ = true;
199-
441+
mounted_.store(true, std::memory_order_release);
200442
return CURVEFS_ERROR::OK;
201443
}
202444

@@ -318,7 +560,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
318560
<< ", inodeid = " << ino;
319561
return ret;
320562
}
321-
322563
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
323564
if (fi->flags & O_TRUNC) {
324565
if (fi->flags & O_WRONLY || fi->flags & O_RDWR) {
@@ -796,7 +1037,6 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
7961037
<< ", inodeid = " << ino;
7971038
return ret;
7981039
}
799-
8001040
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
8011041
Inode *inode = inodeWrapper->GetMutableInodeUnlocked();
8021042

0 commit comments

Comments
 (0)