Skip to content

Commit 6ae1335

Browse files
committed
curvefs/client: add feature of warmup
1 parent 207f26b commit 6ae1335

9 files changed

+1246
-19
lines changed

curvefs/src/client/fuse_client.cpp

+227-6
Original file line numberDiff line numberDiff line change
@@ -122,17 +122,241 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
122122
if (ret3 != CURVEFS_ERROR::OK) {
123123
return ret3;
124124
}
125-
126125
ret3 =
127126
dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics);
128127
if (ret3 != CURVEFS_ERROR::OK) {
129128
return ret3;
130129
}
131-
130+
warmUpFile_.exist = false;
131+
bgCmdStop_.store(false, std::memory_order_release);
132+
bgCmdTaskThread_ = Thread(&FuseClient::BackGroundCmdTask, this);
133+
taskFetchMetaPool_.Start(WARMUP_THREADS);
132134
return ret3;
133135
}
134136

137+
void FuseClient::BackGroundCmdTask() {
138+
while (!mounted_.load(std::memory_order_acquire)) {
139+
usleep(WARMUP_CHECKINTERVAL_US);
140+
VLOG(6) << "wait mount success.";
141+
continue;
142+
}
143+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
144+
std::list<std::string> readAheadPaths;
145+
WaitWarmUp();
146+
while (hasWarmTask()) {
147+
std::string warmUpTask;
148+
GetarmTask(&warmUpTask);
149+
if (warmUpTask.empty()) {
150+
continue;
151+
}
152+
VLOG(9) << "warmup task is: " << warmUpTask;
153+
std::string pDelimiter = "/";
154+
char* pToken = nullptr;
155+
char* pSave = nullptr;
156+
pToken = strtok_r(const_cast<char*>(warmUpTask.c_str()),
157+
const_cast<char*>(pDelimiter.c_str()), &pSave);
158+
if (nullptr == pToken) {
159+
VLOG(3) << "warmUpTask nullptr";
160+
continue;
161+
}
162+
Dentry dentry;
163+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
164+
fsInfo_->rootinodeid(), pToken, &dentry);
165+
if (ret != CURVEFS_ERROR::OK) {
166+
if (ret != CURVEFS_ERROR::NOTEXIST) {
167+
LOG(WARNING) << "dentryManager_ get dentry fail: "
168+
<< ret << ", name: " << warmUpTask;
169+
}
170+
LOG(WARNING) << "FetchDentry error: " << ret
171+
<< ", name: " << warmUpTask;
172+
return;
173+
}
174+
if (FsFileType::TYPE_S3 != dentry.type()) {
175+
LOG(WARNING) << "not a file: " << warmUpTask
176+
<< "type is: " << dentry.type();
177+
return;
178+
}
179+
fuse_ino_t ino = dentry.inodeid();
180+
std::shared_ptr<InodeWrapper> inodeWrapper;
181+
ret = inodeManager_->GetInode(ino, inodeWrapper);
182+
if (ret != CURVEFS_ERROR::OK) {
183+
LOG(ERROR) << "inodeManager get inode fail, ret = "
184+
<< ret << ", inodeid = " << ino;
185+
return;
186+
}
187+
uint64_t len = inodeWrapper->GetLength();
188+
VLOG(9) << "ino is: " << ino << ", len is: " << len;
189+
WarmUpFileContext_t warmUpFile{ino, len, true};
190+
SetWarmUpFile(warmUpFile);
191+
}
192+
}
193+
}
194+
195+
void FuseClient::FetchDentryEnqueue(std::string file) {
196+
VLOG(9) << "FetchDentryEnqueue start: " << file;
197+
auto task = [this, file]() {
198+
LookPath(file);
199+
};
200+
taskFetchMetaPool_.Enqueue(task);
201+
}
202+
203+
void FuseClient::LookPath(std::string file) {
204+
VLOG(9) << "LookPath start: " << file;
205+
std::vector<std::string> splitPath;
206+
// remove enter, newline, blank
207+
std::string blanks("\r\n ");
208+
file.erase(0, file.find_first_not_of(blanks));
209+
file.erase(file.find_last_not_of(blanks) + 1);
210+
if (file.empty()) {
211+
VLOG(9) << "empty path";
212+
return;
213+
}
214+
bool isRoot = false;
215+
if (file == "/") {
216+
splitPath.push_back(file);
217+
isRoot = true;
218+
} else {
219+
std::string pDelimiter = "/";
220+
char* pToken = nullptr;
221+
char* pSave = nullptr;
222+
pToken = strtok_r(const_cast<char*>(file.c_str()),
223+
const_cast<char*>(pDelimiter.c_str()), &pSave);
224+
if (nullptr == pToken) {
225+
VLOG(3) << "lookpath end";
226+
return;
227+
}
228+
splitPath.push_back(pToken);
229+
while (true) {
230+
pToken = strtok_r(NULL, const_cast<char*>(
231+
pDelimiter.c_str()), &pSave);
232+
if (nullptr == pToken) {
233+
VLOG(3) << "lookpath end";
234+
break;
235+
}
236+
VLOG(9) << "pToken is:" << pToken
237+
<< "pSave:" << pSave;
238+
splitPath.push_back(pToken);
239+
}
240+
}
241+
VLOG(6) << "splitPath size is: " << splitPath.size();
242+
if (splitPath.size() == 1 && isRoot) {
243+
VLOG(9) << "i am root";
244+
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
245+
return;
246+
} else if (splitPath.size() == 1) {
247+
VLOG(9) << "parent is root: " << fsInfo_->rootinodeid()
248+
<< ", path is: " << splitPath[0];
249+
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[0]);
250+
return;
251+
} else if (splitPath.size() > 1) { // travel path
252+
VLOG(9) << "traverse path start: " << splitPath.size();
253+
std::string lastName = splitPath.back();
254+
splitPath.pop_back();
255+
fuse_ino_t ino = fsInfo_->rootinodeid();
256+
for (auto iter : splitPath) {
257+
VLOG(9) << "traverse path: " << iter
258+
<< "ino is: " << ino;
259+
Dentry dentry;
260+
std::string pathName = iter;
261+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
262+
ino, pathName, &dentry);
263+
if (ret != CURVEFS_ERROR::OK) {
264+
if (ret != CURVEFS_ERROR::NOTEXIST) {
265+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = "
266+
<< ret << ", parent inodeid = " << ino
267+
<< ", name = " << file;
268+
}
269+
VLOG(9) << "whs FetchDentry error: " << ret;
270+
return;
271+
}
272+
ino = dentry.inodeid();
273+
}
274+
this->FetchDentry(ino, lastName);
275+
VLOG(9) << "ino is: " << ino
276+
<< "lastname is: " << lastName;
277+
return;
278+
} else {
279+
VLOG(3) << "unknown path";
280+
}
281+
return;
282+
}
283+
284+
void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
285+
auto task = [this, ino]() {
286+
// resolve层层递进,获得inode
287+
this->FetchChildDentry(ino);
288+
};
289+
taskFetchMetaPool_.Enqueue(task);
290+
}
291+
292+
void FuseClient::FetchChildDentry(fuse_ino_t ino) {
293+
VLOG(9) << "FetchChildDentry start: " << ino;
294+
std::list<Dentry> dentryList;
295+
auto limit = option_.listDentryLimit;
296+
CURVEFS_ERROR ret = dentryManager_->ListDentry(
297+
ino, &dentryList, limit);
298+
if (ret != CURVEFS_ERROR::OK) {
299+
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
300+
<< ", parent = " << ino;
301+
return;
302+
}
303+
for (auto iter : dentryList) {
304+
VLOG(9) << "FetchChildDentry: " << iter.name();
305+
if (FsFileType::TYPE_S3 == iter.type()) {
306+
std::unique_lock<std::mutex> lck(fetchMtx_);
307+
readAheadFiles_.push_front(iter.inodeid());
308+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
309+
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
310+
FetchChildDentryEnqueue(iter.inodeid());
311+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
312+
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
313+
} else {
314+
VLOG(9) << "unknown type";
315+
}
316+
}
317+
return;
318+
}
319+
320+
void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
321+
VLOG(9) << "FetchDentry start: " << file
322+
<< ", ino: " << ino;
323+
Dentry dentry;
324+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
325+
if (ret != CURVEFS_ERROR::OK) {
326+
if (ret != CURVEFS_ERROR::NOTEXIST) {
327+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
328+
<< ", parent inodeid = " << ino
329+
<< ", name = " << file;
330+
}
331+
VLOG(1) << "FetchDentry error: " << ret;
332+
return;
333+
}
334+
if (FsFileType::TYPE_S3 == dentry.type()) {
335+
std::unique_lock<std::mutex> lck(fetchMtx_);
336+
readAheadFiles_.push_front(dentry.inodeid());
337+
return;
338+
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
339+
FetchChildDentryEnqueue(dentry.inodeid());
340+
VLOG(9) << "FetchDentry: " << dentry.inodeid();
341+
return;
342+
343+
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
344+
} else {
345+
VLOG(3) << "unkown, file: " << file
346+
<< ", ino: " << ino;
347+
}
348+
VLOG(9) << "FetchDentry end: " << file
349+
<< ", ino: " << ino;
350+
return;
351+
}
352+
135353
void FuseClient::UnInit() {
354+
bgCmdStop_.store(true, std::memory_order_release);
355+
WarmUpRun();
356+
if (bgCmdTaskThread_.joinable()) {
357+
bgCmdTaskThread_.join();
358+
}
359+
taskFetchMetaPool_.Stop();
136360
delete mdsBase_;
137361
mdsBase_ = nullptr;
138362
}
@@ -178,7 +402,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
178402
<< ", mountPoint = " << mountpoint_.ShortDebugString();
179403
return CURVEFS_ERROR::MOUNT_FAILED;
180404
}
181-
182405
inodeManager_->SetFsId(fsInfo_->fsid());
183406
dentryManager_->SetFsId(fsInfo_->fsid());
184407
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
@@ -196,7 +419,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
196419
}
197420

198421
init_ = true;
199-
422+
mounted_.store(true, std::memory_order_release);
200423
return CURVEFS_ERROR::OK;
201424
}
202425

@@ -318,7 +541,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
318541
<< ", inodeid = " << ino;
319542
return ret;
320543
}
321-
322544
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
323545
if (fi->flags & O_TRUNC) {
324546
if (fi->flags & O_WRONLY || fi->flags & O_RDWR) {
@@ -796,7 +1018,6 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
7961018
<< ", inodeid = " << ino;
7971019
return ret;
7981020
}
799-
8001021
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
8011022
Inode *inode = inodeWrapper->GetMutableInodeUnlocked();
8021023

0 commit comments

Comments
 (0)