Skip to content

Commit 11ae6bc

Browse files
committed
curvefs/client: add feature of warmup
1 parent 207f26b commit 11ae6bc

File tree

7 files changed

+1211
-17
lines changed

7 files changed

+1211
-17
lines changed

curvefs/src/client/fuse_client.cpp

+231-6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <set>
3232
#include <unordered_map>
3333
#include <utility>
34+
#include <boost/algorithm/string.hpp>
3435

3536
#include "curvefs/proto/mds.pb.h"
3637
#include "curvefs/src/client/fuse_common.h"
@@ -75,6 +76,15 @@ using rpcclient::Cli2ClientImpl;
7576
using rpcclient::MetaCache;
7677
using common::FLAGS_enableCto;
7778

79+
static bool checkWarmupListPath(const char*, const std::string& target) {
80+
// do something to check the path
81+
LOG(INFO) << "warmupListPath: " << target;
82+
return true;
83+
}
84+
DEFINE_string(warmupListPath, "",
85+
"the path to the list of files (dirs) that need to warmup.");
86+
DEFINE_validator(warmupListPath, checkWarmupListPath);
87+
7888
CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
7989
option_ = option;
8090

@@ -122,17 +132,235 @@ CURVEFS_ERROR FuseClient::Init(const FuseClientOption &option) {
122132
if (ret3 != CURVEFS_ERROR::OK) {
123133
return ret3;
124134
}
125-
126135
ret3 =
127136
dentryManager_->Init(option.dCacheLruSize, option.enableDCacheMetrics);
128137
if (ret3 != CURVEFS_ERROR::OK) {
129138
return ret3;
130139
}
131-
140+
warmUpFile_.exist = false;
141+
bgCmdTaskStop_.store(false, std::memory_order_release);
142+
bgCmdTaskThread_ = Thread(&FuseClient::BackGroundCmdTask, this);
143+
bgCmdStop_.store(false, std::memory_order_release);
144+
bgCmdThread_ = Thread(&FuseClient::BackGroundCmd, this);
145+
FLAGS_warmupListPath = "";
146+
taskFetchMetaPool_.Start(WARMUP_THREADS);
132147
return ret3;
133148
}
134149

150+
void FuseClient::BackGroundCmd() {
151+
while (!mounted_.load(std::memory_order_acquire)) {
152+
usleep(WARMUP_CHECKINTERVAL_US);
153+
VLOG(6) << "wait mount success.";
154+
continue;
155+
}
156+
std::string preWarmUpPath = FLAGS_warmupListPath;
157+
std::string warmUpPath;
158+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
159+
warmUpPath = FLAGS_warmupListPath;
160+
if (warmUpPath == preWarmUpPath) {
161+
usleep(WARMUP_CHECKINTERVAL_US); // check interval
162+
continue;
163+
}
164+
VLOG(6) << "has new warmUp task: " << warmUpPath;
165+
preWarmUpPath = warmUpPath;
166+
PutWarmTask(warmUpPath);
167+
WarmUpRun();
168+
}
169+
return;
170+
}
171+
172+
void FuseClient::BackGroundCmdTask() {
173+
while (!mounted_.load(std::memory_order_acquire)) {
174+
usleep(WARMUP_CHECKINTERVAL_US);
175+
VLOG(6) << "wait mount success.";
176+
continue;
177+
}
178+
while (!bgCmdStop_.load(std::memory_order_acquire)) {
179+
std::list<std::string> readAheadPaths;
180+
WaitWarmUp();
181+
while (hasWarmTask()) {
182+
std::string warmUpTask;
183+
GetarmTask(&warmUpTask);
184+
if (warmUpTask.empty()) {
185+
continue;
186+
}
187+
VLOG(6) << "warmUp task is: " << warmUpTask;
188+
std::vector<std::string> splitPath;
189+
boost::split(splitPath, warmUpTask, boost::is_any_of("/"),
190+
boost::token_compress_on);
191+
Dentry dentry;
192+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
193+
fsInfo_->rootinodeid(), splitPath[1], &dentry);
194+
if (ret != CURVEFS_ERROR::OK) {
195+
if (ret != CURVEFS_ERROR::NOTEXIST) {
196+
LOG(WARNING) << "dentryManager_ get dentry fail: "
197+
<< ret << ", name: " << warmUpTask;
198+
}
199+
VLOG(1) << "FetchDentry error: " << ret;
200+
return;
201+
}
202+
if (FsFileType::TYPE_S3 != dentry.type()) {
203+
VLOG(3) << "not a file: " << warmUpTask;
204+
return;
205+
}
206+
207+
fuse_ino_t ino = dentry.inodeid();
208+
std::shared_ptr<InodeWrapper> inodeWrapper;
209+
ret = inodeManager_->GetInode(ino, inodeWrapper);
210+
if (ret != CURVEFS_ERROR::OK) {
211+
LOG(ERROR) << "inodeManager get inode fail, ret = "
212+
<< ret << ", inodeid = " << ino;
213+
return;
214+
}
215+
uint64_t len = inodeWrapper->GetLength();
216+
VLOG(9) << "ino is: " << ino << ", len is: " << len;
217+
WarmUpFileContext_t warmUpFile{ino, len, true};
218+
SetWarmUpFile(warmUpFile);
219+
}
220+
}
221+
}
222+
223+
void FuseClient::FetchDentryEnqueue(std::string file) {
224+
VLOG(6) << "FetchDentryEnqueue start: " << file;
225+
auto task = [this, file]() {
226+
LookPath(file);
227+
};
228+
taskFetchMetaPool_.Enqueue(task);
229+
}
230+
231+
void FuseClient::LookPath(std::string file) {
232+
VLOG(6) << "LookPath start: " << file;
233+
// remove the blank
234+
boost::trim(file);
235+
std::vector<std::string> splitPath;
236+
boost::split(splitPath, file, boost::is_any_of("/"),
237+
boost::token_compress_on);
238+
if (splitPath.size() == 2
239+
&& splitPath.back().empty()) {
240+
VLOG(6) << "i am root";
241+
FetchChildDentryEnqueue(fsInfo_->rootinodeid());
242+
return;
243+
} else if (splitPath.size() == 2) {
244+
VLOG(6) << "parent is root: " << fsInfo_->rootinodeid()
245+
<< ", path is: " << splitPath[1];
246+
this->FetchDentry(fsInfo_->rootinodeid(), splitPath[1]);
247+
return;
248+
} else if (splitPath.size() > 2) { // travel path
249+
VLOG(6) << "traverse path size: " << splitPath.size();
250+
std::string lastName = splitPath.back();
251+
splitPath.pop_back();
252+
fuse_ino_t ino = fsInfo_->rootinodeid();
253+
auto iter = splitPath.begin();
254+
// the first member is always empty, so skip
255+
iter++;
256+
for (; iter != splitPath.end(); iter++) {
257+
VLOG(9) << "traverse path: " << *iter
258+
<< "ino is: " << ino;
259+
Dentry dentry;
260+
std::string pathName = *iter;
261+
CURVEFS_ERROR ret = dentryManager_->GetDentry(
262+
ino, pathName, &dentry);
263+
if (ret != CURVEFS_ERROR::OK) {
264+
if (ret != CURVEFS_ERROR::NOTEXIST) {
265+
LOG(WARNING) << "dentryManager_ get dentry fail, ret: "
266+
<< ret << ", parent inodeid: " << ino
267+
<< ", name: " << file;
268+
}
269+
VLOG(1) << "FetchDentry error: " << ret;
270+
return;
271+
}
272+
ino = dentry.inodeid();
273+
}
274+
this->FetchDentry(ino, lastName);
275+
VLOG(9) << "ino is: " << ino
276+
<< "lastname is: " << lastName;
277+
return;
278+
} else {
279+
VLOG(0) << "unknown path";
280+
}
281+
return;
282+
}
283+
284+
void FuseClient::FetchChildDentryEnqueue(fuse_ino_t ino) {
285+
auto task = [this, ino]() {
286+
// resolve层层递进,获得inode
287+
this->FetchChildDentry(ino);
288+
};
289+
taskFetchMetaPool_.Enqueue(task);
290+
}
291+
292+
void FuseClient::FetchChildDentry(fuse_ino_t ino) {
293+
VLOG(9) << "FetchChildDentry start: " << ino;
294+
std::list<Dentry> dentryList;
295+
auto limit = option_.listDentryLimit;
296+
CURVEFS_ERROR ret = dentryManager_->ListDentry(
297+
ino, &dentryList, limit);
298+
if (ret != CURVEFS_ERROR::OK) {
299+
LOG(ERROR) << "dentryManager_ ListDentry fail, ret = " << ret
300+
<< ", parent = " << ino;
301+
return;
302+
}
303+
for (auto iter : dentryList) {
304+
VLOG(9) << "FetchChildDentry: " << iter.name();
305+
if (FsFileType::TYPE_S3 == iter.type()) {
306+
std::unique_lock<std::mutex> lck(fetchMtx_);
307+
readAheadFiles_.push_front(iter.inodeid());
308+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();;
309+
} else if (FsFileType::TYPE_DIRECTORY == iter.type()) {
310+
FetchChildDentryEnqueue(iter.inodeid());
311+
VLOG(9) << "FetchChildDentry: " << iter.inodeid();
312+
} else if (FsFileType::TYPE_SYM_LINK == iter.type()) { // need todo
313+
} else {
314+
VLOG(0) << "unknown type";
315+
}
316+
}
317+
return;
318+
}
319+
320+
void FuseClient::FetchDentry(fuse_ino_t ino, std::string file) {
321+
VLOG(9) << "FetchDentry start: " << file
322+
<< ", ino: " << ino;
323+
Dentry dentry;
324+
CURVEFS_ERROR ret = dentryManager_->GetDentry(ino, file, &dentry);
325+
if (ret != CURVEFS_ERROR::OK) {
326+
if (ret != CURVEFS_ERROR::NOTEXIST) {
327+
LOG(WARNING) << "dentryManager_ get dentry fail, ret = " << ret
328+
<< ", parent inodeid = " << ino
329+
<< ", name = " << file;
330+
}
331+
VLOG(1) << "FetchDentry error: " << ret;
332+
return;
333+
}
334+
if (FsFileType::TYPE_S3 == dentry.type()) {
335+
std::unique_lock<std::mutex> lck(fetchMtx_);
336+
readAheadFiles_.push_front(dentry.inodeid());
337+
return;
338+
} else if (FsFileType::TYPE_DIRECTORY == dentry.type()) {
339+
FetchChildDentryEnqueue(dentry.inodeid());
340+
VLOG(9) << "FetchDentry: " << dentry.inodeid();
341+
return;
342+
343+
} else if (FsFileType::TYPE_SYM_LINK == dentry.type()) {
344+
} else {
345+
VLOG(0) << "unkown, file: " << file
346+
<< ", ino: " << ino;
347+
}
348+
VLOG(9) << "FetchDentry end: " << file
349+
<< ", ino: " << ino;
350+
return;
351+
}
352+
135353
void FuseClient::UnInit() {
354+
bgCmdTaskStop_.store(true, std::memory_order_release);
355+
bgCmdStop_.store(true, std::memory_order_release);
356+
WarmUpRun();
357+
if (bgCmdTaskThread_.joinable()) {
358+
bgCmdTaskThread_.join();
359+
}
360+
if (bgCmdThread_.joinable()) {
361+
bgCmdThread_.join();
362+
}
363+
taskFetchMetaPool_.Stop();
136364
delete mdsBase_;
137365
mdsBase_ = nullptr;
138366
}
@@ -178,7 +406,6 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
178406
<< ", mountPoint = " << mountpoint_.ShortDebugString();
179407
return CURVEFS_ERROR::MOUNT_FAILED;
180408
}
181-
182409
inodeManager_->SetFsId(fsInfo_->fsid());
183410
dentryManager_->SetFsId(fsInfo_->fsid());
184411
enableSumInDir_ = fsInfo_->enablesumindir() && !FLAGS_enableCto;
@@ -196,7 +423,7 @@ CURVEFS_ERROR FuseClient::FuseOpInit(void *userdata,
196423
}
197424

198425
init_ = true;
199-
426+
mounted_.store(true, std::memory_order_release);
200427
return CURVEFS_ERROR::OK;
201428
}
202429

@@ -318,7 +545,6 @@ CURVEFS_ERROR FuseClient::FuseOpOpen(fuse_req_t req, fuse_ino_t ino,
318545
<< ", inodeid = " << ino;
319546
return ret;
320547
}
321-
322548
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
323549
if (fi->flags & O_TRUNC) {
324550
if (fi->flags & O_WRONLY || fi->flags & O_RDWR) {
@@ -796,7 +1022,6 @@ CURVEFS_ERROR FuseClient::FuseOpSetAttr(fuse_req_t req, fuse_ino_t ino,
7961022
<< ", inodeid = " << ino;
7971023
return ret;
7981024
}
799-
8001025
::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock();
8011026
Inode *inode = inodeWrapper->GetMutableInodeUnlocked();
8021027

0 commit comments

Comments
 (0)