Skip to content

Commit 1a4c055

Browse files
committed
curvefs/client: add feature of warmup
1 parent 207f26b commit 1a4c055

9 files changed

+1247
-17
lines changed

curvefs/src/client/fuse_client.cpp

+201-6
Original file line numberDiff line numberDiff line change
@@ -122,17 +122,215 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
122122
if (ret3 != CURVEFS_ERROR::OK) {
123123
return ret3;
124124
}
125-
126125
ret3 =
127126
dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics);
128127
if (ret3 != CURVEFS_ERROR::OK) {
129128
return ret3;
130129
}
131-
130+
warmUpFile_.exist = false;
131+
bgCmdStop_.store(false, std::memory_order_release);
132+
bgCmdTaskThread_ = Thread(&FuseClient::WarmUpTask, this);
133+
taskFetchMetaPool_.Start(WARMUP_THREADS);
132134
return ret3;
133135
}
134136

137+
void FuseClient::WarmUpTask() {
138+
// TODO(hzwuhongsong): Maybe we can start the warmup thread after mount
139+
while (!mounted_.load(std::memory_order_acquire)) {
140+
usleep(WARMUP_CHECKINTERVAL_US);
141+
VLOG(6) << "wait mount success.";
142+
continue;
143+
}
144+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
145+
std::list<std::string> readAheadPaths;
146+
WaitWarmUp();
147+
while (hasWarmTask()) {
148+
std::string warmUpTask;
149+
GetwarmTask(&warmUpTask);
150+
VLOG(9) << "warmup task is: " << warmUpTask;
151+
std::string pDelimiter = "/";
152+
char* pToken = nullptr;
153+
char* pSave = nullptr;
154+
pToken = strtok_r(const_cast<char*>(warmUpTask.c_str()),
155+
const_cast<char*>(pDelimiter.c_str()), &pSave);
156+
if (nullptr == pToken) {
157+
VLOG(3) << "warmUpTask nullptr";
158+
continue;
159+
}
160+
Dentry dentry;
161+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
162+
fsInfo_->rootinodeid(), pToken, &dentry);
163+
if (ret != CURVEFS_ERROR::OK) {
164+
LOG(WARNING) << "FetchDentry error: " << ret
165+
<< ", name: " << warmUpTask;
166+
return;
167+
}
168+
if (FsFileType::TYPE_S3 != dentry.type()) {
169+
LOG(WARNING) << "not a file: " << warmUpTask
170+
<< "type is: " << dentry.type();
171+
return;
172+
}
173+
fuse_ino_t ino = dentry.inodeid();
174+
std::shared_ptr<InodeWrapper> inodeWrapper;
175+
ret = inodeManager_->GetInode(ino, inodeWrapper);
176+
if (ret != CURVEFS_ERROR::OK) {
177+
LOG(ERROR) << "inodeManager get inode fail, ret = "
178+
<< ret << ", inodeid = " << ino;
179+
return;
180+
}
181+
uint64_t len = inodeWrapper->GetLength();
182+
VLOG(9) << "ino is: " << ino << ", len is: " << len;
183+
WarmUpFileContext_t warmUpFile{ino, len, true};
184+
SetWarmUpFile(warmUpFile);
185+
}
186+
}
187+
}
188+
189+
void FuseClient::FetchDentryEnqueue(std::string file) {
190+
VLOG(9) << "FetchDentryEnqueue start: " << file;
191+
auto task = [this, file]() {
192+
LookPath(file);
193+
};
194+
taskFetchMetaPool_.Enqueue(task);
195+
}
196+
197+
void FuseClient::LookPath(std::string file) {
198+
VLOG(9) << "LookPath start: " << file;
199+
std::vector<std::string> splitPath;
200+
// remove enter, newline, blank
201+
std::string blanks("\r\n ");
202+
file.erase(0, file.find_first_not_of(blanks));
203+
file.erase(file.find_last_not_of(blanks) + 1);
204+
if (file.empty()) {
205+
VLOG(9) << "empty path";
206+
return;
207+
}
208+
bool isRoot = false;
209+
if (file == "/") {
210+
splitPath.push_back(file);
211+
isRoot = true;
212+
} else {
213+
splitStr(file, "/", &splitPath);
214+
}
215+
VLOG(6) << "splitPath size is: " << splitPath.size();
216+
if (splitPath.size() == 1 && isRoot) {
217+
VLOG(9) << "i am root";
218+
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
219+
return;
220+
} else if (splitPath.size() == 1) {
221+
VLOG(9) << "parent is root: " << fsInfo_->rootinodeid()
222+
<< ", path is: " << splitPath[0];
223+
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[0]);
224+
return;
225+
} else if (splitPath.size() > 1) { // travel path
226+
VLOG(9) << "traverse path start: " << splitPath.size();
227+
std::string lastName = splitPath.back();
228+
splitPath.pop_back();
229+
fuse_ino_t ino = fsInfo_->rootinodeid();
230+
for (auto iter : splitPath) {
231+
VLOG(9) << "traverse path: " << iter
232+
<< "ino is: " << ino;
233+
Dentry dentry;
234+
std::string pathName = iter;
235+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
236+
ino, pathName, &dentry);
237+
if (ret != CURVEFS_ERROR::OK) {
238+
if (ret != CURVEFS_ERROR::NOTEXIST) {
239+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = "
240+
<< ret << ", parent inodeid = " << ino
241+
<< ", name = " << file;
242+
}
243+
VLOG(9) << "FetchDentry error: " << ret;
244+
return;
245+
}
246+
ino = dentry.inodeid();
247+
}
248+
this->FetchDentry(ino, lastName);
249+
VLOG(9) << "ino is: " << ino
250+
<< "lastname is: " << lastName;
251+
return;
252+
} else {
253+
VLOG(3) << "unknown path";
254+
}
255+
return;
256+
}
257+
258+
void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
259+
auto task = [this, ino]() {
260+
// reverse from root
261+
this->FetchChildDentry(ino);
262+
};
263+
taskFetchMetaPool_.Enqueue(task);
264+
}
265+
266+
void FuseClient::FetchChildDentry(fuse_ino_t ino) {
267+
VLOG(9) << "FetchChildDentry start: " << ino;
268+
std::list<Dentry> dentryList;
269+
auto limit = option_.listDentryLimit;
270+
CURVEFS_ERROR ret = dentryManager_->ListDentry(
271+
ino, &dentryList, limit);
272+
if (ret != CURVEFS_ERROR::OK) {
273+
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
274+
<< ", parent = " << ino;
275+
return;
276+
}
277+
for (auto iter : dentryList) {
278+
VLOG(9) << "FetchChildDentry: " << iter.name();
279+
if (FsFileType::TYPE_S3 == iter.type()) {
280+
std::unique_lock<std::mutex> lck(fetchMtx_);
281+
readAheadFiles_.push_front(iter.inodeid());
282+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
283+
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
284+
FetchChildDentryEnqueue(iter.inodeid());
285+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
286+
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
287+
} else {
288+
VLOG(9) << "unknown type";
289+
}
290+
}
291+
return;
292+
}
293+
294+
void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
295+
VLOG(9) << "FetchDentry start: " << file
296+
<< ", ino: " << ino;
297+
Dentry dentry;
298+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
299+
if (ret != CURVEFS_ERROR::OK) {
300+
if (ret != CURVEFS_ERROR::NOTEXIST) {
301+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
302+
<< ", parent inodeid = " << ino
303+
<< ", name = " << file;
304+
}
305+
VLOG(1) << "FetchDentry error: " << ret;
306+
return;
307+
}
308+
if (FsFileType::TYPE_S3 == dentry.type()) {
309+
std::unique_lock<std::mutex> lck(fetchMtx_);
310+
readAheadFiles_.push_front(dentry.inodeid());
311+
return;
312+
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
313+
FetchChildDentryEnqueue(dentry.inodeid());
314+
VLOG(9) << "FetchDentry: " << dentry.inodeid();
315+
return;
316+
317+
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
318+
} else {
319+
VLOG(3) << "unkown, file: " << file
320+
<< ", ino: " << ino;
321+
}
322+
VLOG(9) << "FetchDentry end: " << file
323+
<< ", ino: " << ino;
324+
return;
325+
}
326+
135327
void FuseClient::UnInit() {
328+
bgCmdStop_.store(true, std::memory_order_release);
329+
WarmUpRun();
330+
if (bgCmdTaskThread_.joinable()) {
331+
bgCmdTaskThread_.join();
332+
}
333+
taskFetchMetaPool_.Stop();
136334
delete mdsBase_;
137335
mdsBase_ = nullptr;
138336
}
@@ -178,7 +376,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
178376
<< ", mountPoint = " << mountpoint_.ShortDebugString();
179377
return CURVEFS_ERROR::MOUNT_FAILED;
180378
}
181-
182379
inodeManager_->SetFsId(fsInfo_->fsid());
183380
dentryManager_->SetFsId(fsInfo_->fsid());
184381
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
@@ -196,7 +393,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
196393
}
197394

198395
init_ = true;
199-
396+
mounted_.store(true, std::memory_order_release);
200397
return CURVEFS_ERROR::OK;
201398
}
202399

@@ -318,7 +515,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
318515
<< ", inodeid = " << ino;
319516
return ret;
320517
}
321-
322518
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
323519
if (fi->flags & O_TRUNC) {
324520
if (fi->flags & O_WRONLY || fi->flags & O_RDWR) {
@@ -796,7 +992,6 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
796992
<< ", inodeid = " << ino;
797993
return ret;
798994
}
799-
800995
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
801996
Inode *inode = inodeWrapper->GetMutableInodeUnlocked();
802997

0 commit comments

Comments
 (0)