Skip to content

Commit

Permalink
Scan through the last wal in case wal is cut off. (vesoft-inc#1194)
Browse files Browse the repository at this point in the history
* fix wal

* add more ut

* address @dangleptr's comments

* truncate when rollback or last wal is illegal

* update

* fix ut
  • Loading branch information
critical27 authored and dangleptr committed Nov 5, 2019
1 parent d2540dd commit 75326df
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 51 deletions.
173 changes: 124 additions & 49 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,10 @@ FileBasedWal::FileBasedWal(const folly::StringPiece dir,
if (!walFiles_.empty()) {
firstLogId_ = walFiles_.begin()->second->firstId();
auto& info = walFiles_.rbegin()->second;
if (info->lastId() <= 0 && walFiles_.size() > 1) {
auto it = walFiles_.rbegin();
it++;
lastLogId_ = info->firstId() - 1;
lastLogTerm_ = readTermId(it->second->path(), lastLogId_);
} else {
lastLogId_ = info->lastId();
lastLogTerm_ = info->lastTerm();
}
lastLogId_ = info->lastId();
lastLogTerm_ = info->lastTerm();
LOG(INFO) << idStr_ << "lastLogId in wal is " << lastLogId_
<< ", lastLogTerm is " << lastLogTerm_;
currFd_ = open(info->path(), O_WRONLY | O_APPEND);
currInfo_ = info;
CHECK_GE(currFd_, 0);
Expand All @@ -77,8 +72,7 @@ FileBasedWal::~FileBasedWal() {


void FileBasedWal::scanAllWalFiles() {
std::vector<std::string> files =
FileUtils::listAllFilesInDir(dir_.c_str(), false, "*.wal");
std::vector<std::string> files = FileUtils::listAllFilesInDir(dir_.c_str(), false, "*.wal");
for (auto& fn : files) {
// Split the file name
// The file name convention is "<first id in the file>.wal"
Expand All @@ -100,6 +94,7 @@ void FileBasedWal::scanAllWalFiles() {
WalFileInfoPtr info = std::make_shared<WalFileInfo>(
FileUtils::joinPath(dir_, fn),
startIdFromName);
walFiles_.insert(std::make_pair(startIdFromName, info));

// Get the size of the file and the mtime
struct stat st;
Expand All @@ -116,7 +111,6 @@ void FileBasedWal::scanAllWalFiles() {
LOG(WARNING) << "Found empty wal file \"" << fn << "\"";
info->setLastId(0);
info->setLastTerm(0);
walFiles_.insert(std::make_pair(startIdFromName, info));
continue;
}

Expand Down Expand Up @@ -238,7 +232,16 @@ void FileBasedWal::scanAllWalFiles() {

// We now get all necessary info
close(fd);
walFiles_.insert(std::make_pair(startIdFromName, info));
}

if (!walFiles_.empty()) {
auto it = walFiles_.rbegin();
// Try to scan last wal, if it is invalid or empty, scan the privous one
scanLastWal(it->second, it->second->firstId());
if (it->second->lastId() <= 0) {
unlink(it->second->path());
walFiles_.erase(it->first);
}
}

// Make sure there is no gap in the logs
Expand Down Expand Up @@ -321,38 +324,31 @@ void FileBasedWal::prepareNewFile(LogID startLogId) {
}


TermID FileBasedWal::readTermId(const char* path, LogID logId) {
int32_t fd = open(path, O_RDONLY);
void FileBasedWal::rollbackInFile(WalFileInfoPtr info, LogID logId) {
auto path = info->path();
int32_t fd = open(path, O_RDWR);
if (fd < 0) {
LOG(FATAL) << "Failed to open file \"" << path
<< "\" (errno: " << errno << "): "
<< strerror(errno);
}

size_t pos = 0;
LogID id = 0;
TermID term = 0;
while (true) {
// Read the log Id
LogID id = 0;
if (pread(fd, &id, sizeof(LogID), pos) != sizeof(LogID)) {
LOG(ERROR) << "Failed to read the log id (errno "
<< errno << "): " << strerror(errno);
close(fd);
return 0;
break;
}

// Read the term Id
TermID term = 0;
if (pread(fd, &term, sizeof(TermID), pos + sizeof(LogID)) != sizeof(TermID)) {
LOG(ERROR) << "Failed to read the term id (errno "
<< errno << "): " << strerror(errno);
close(fd);
return 0;
}

if (id == logId) {
// Found
close(fd);
return term;
break;
}

// Read the message length
Expand All @@ -361,8 +357,7 @@ TermID FileBasedWal::readTermId(const char* path, LogID logId) {
!= sizeof(int32_t)) {
LOG(ERROR) << "Failed to read the message length (errno "
<< errno << "): " << strerror(errno);
close(fd);
return 0;
break;
}

// Move to the next log
Expand All @@ -371,9 +366,106 @@ TermID FileBasedWal::readTermId(const char* path, LogID logId) {
+ sizeof(ClusterID)
+ 2 * sizeof(int32_t)
+ len;

if (id == logId) {
break;
}
}

if (id != logId) {
LOG(FATAL) << idStr_ << "Didn't found log " << logId << " in " << path;
}
lastLogId_ = logId;
lastLogTerm_ = term;
LOG(INFO) << idStr_ << "Rollback to log " << logId;

if (0 < pos && pos < FileUtils::fileSize(path)) {
LOG(INFO) << idStr_ << "Need to truncate from offset " << pos;
if (ftruncate(fd, pos) < 0) {
LOG(FATAL) << "Failed to truncate file \"" << path
<< "\" (errno: " << errno << "): "
<< strerror(errno);
}
info->setSize(pos);
}
close(fd);
}


void FileBasedWal::scanLastWal(WalFileInfoPtr info, LogID firstId) {
auto* path = info->path();
int32_t fd = open(path, O_RDWR);
if (fd < 0) {
LOG(FATAL) << "Failed to open file \"" << path
<< "\" (errno: " << errno << "): "
<< strerror(errno);
}

LogID curLogId = firstId;
size_t pos = 0;
LogID id = 0;
TermID term = 0;
int32_t head = 0;
int32_t foot = 0;
while (true) {
// Read the log Id
if (pread(fd, &id, sizeof(LogID), pos) != sizeof(LogID)) {
break;
}

if (id != curLogId) {
LOG(ERROR) << "LogId is not consistent" << id << " " << curLogId;
break;
}

// Read the term Id
if (pread(fd, &term, sizeof(TermID), pos + sizeof(LogID)) != sizeof(TermID)) {
break;
}

// Read the message length
if (pread(fd, &head, sizeof(int32_t), pos + sizeof(LogID) + sizeof(TermID))
!= sizeof(int32_t)) {
break;
}

if (pread(fd, &foot, sizeof(int32_t),
pos + sizeof(LogID) + sizeof(TermID) + sizeof(int32_t) + sizeof(ClusterID) + head)
!= sizeof(int32_t)) {
break;
}

if (head != foot) {
LOG(ERROR) << "Message size doen't match: " << head << " != " << foot;
break;
}

info->setLastTerm(term);
info->setLastId(id);

// Move to the next log
pos += sizeof(LogID)
+ sizeof(TermID)
+ sizeof(ClusterID)
+ sizeof(int32_t)
+ head
+ sizeof(int32_t);

++curLogId;
}
LOG(INFO) << idStr_ << "Scan last wal " << path << ", last wal id is " << id;

if (0 < pos && pos < FileUtils::fileSize(path)) {
LOG(WARNING) << "Invalid wal " << path << ", truncate from offset " << pos;
if (ftruncate(fd, pos) < 0) {
LOG(FATAL) << "Failed to truncate file \"" << path
<< "\" (errno: " << errno << "): "
<< strerror(errno);
}
info->setSize(pos);
}

LOG(FATAL) << "Should never reach here";
close(fd);
}


Expand Down Expand Up @@ -541,8 +633,7 @@ bool FileBasedWal::rollbackToLog(LogID id) {
VLOG(1) << "Roll back to log " << id
<< ", the last WAL file is now \""
<< walFiles_.rbegin()->second->path() << "\"";
lastLogId_ = id;
lastLogTerm_ = readTermId(walFiles_.rbegin()->second->path(), lastLogId_);
rollbackInFile(walFiles_.rbegin()->second, id);
}

// Create the next WAL file
Expand All @@ -553,24 +644,8 @@ bool FileBasedWal::rollbackToLog(LogID id) {
// 2. Roll back in-memory buffers
//------------------------------
{
// First rollback from buffers
std::unique_lock<std::mutex> g(buffersMutex_);

// Remove all buffers that are rolled back
auto it = buffers_.begin();
while (it != buffers_.end() && (*it)->firstLogId() <= id) {
it++;
}
while (it != buffers_.end()) {
it = buffers_.erase(it);
}

// Need to rollover to a new buffer
if (buffers_.size() == policy_.numBuffers) {
// Need to pop the first one
buffers_.pop_front();
}
buffers_.emplace_back(std::make_shared<InMemoryLogBuffer>(id + 1));
buffers_.clear();
}

return true;
Expand Down
7 changes: 5 additions & 2 deletions src/kvstore/wal/FileBasedWal.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ using PreProcessor = folly::Function<bool(LogID, TermID, ClusterID, const std::s
class FileBasedWal final : public Wal
, public std::enable_shared_from_this<FileBasedWal> {
FRIEND_TEST(FileBasedWal, TTLTest);
FRIEND_TEST(FileBasedWal, CheckLastWalTest);
friend class FileBasedWalIterator;
public:
// A factory method to create a new WAL
Expand Down Expand Up @@ -141,12 +142,14 @@ class FileBasedWal final : public Wal
// Scan all WAL files
void scanAllWalFiles();

void scanLastWal(WalFileInfoPtr info, LogID firstId);

// Close down the current wal file
void closeCurrFile();
// Prepare a new wal file starting from the given log id
void prepareNewFile(LogID startLogId);
// Retrieve the term id for the given log id in the given WAL file
TermID readTermId(const char* path, LogID logId);
// Rollback to logId in given file
void rollbackInFile(WalFileInfoPtr info, LogID logId);

// Return the last buffer.
// If the last buffer is big enough, create a new one
Expand Down
101 changes: 101 additions & 0 deletions src/kvstore/wal/test/FileBasedWalTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,107 @@ TEST(FileBasedWal, TTLTest) {
}
}

TEST(FileBasedWal, CheckLastWalTest) {
FileBasedWalPolicy policy;
policy.fileSize = 1024L * 1024L;
TempDir walDir("/tmp/testWal.XXXXXX");

auto wal = FileBasedWal::getWal(walDir.path(),
"",
policy,
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
{
EXPECT_EQ(0, wal->lastLogId());

for (int i = 1; i <= 1000; i++) {
EXPECT_TRUE(
wal->appendLog(i /*id*/, 1 /*term*/, 0 /*cluster*/,
folly::stringPrintf(kLongMsg, i)));
}
EXPECT_EQ(1000, wal->lastLogId());
wal.reset();
}
{
// Modify the wal file, make last wal invalid
std::vector<std::string> files = FileUtils::listAllFilesInDir(walDir.path(), true, "*.wal");
std::sort(files.begin(), files.end());
size_t size = FileUtils::fileSize(files.back().c_str());
auto fd = open(files.back().c_str(), O_WRONLY | O_APPEND);
ftruncate(fd, size - sizeof(int32_t));
close(fd);

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(),
"",
policy,
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(999, wal->lastLogId());
}
{
// get lastId in previous wal, make last wal invalid
std::vector<std::string> files = FileUtils::listAllFilesInDir(walDir.path(), true, "*.wal");
std::sort(files.begin(), files.end());
auto lastWalPath = files.back();
auto it = wal->walFiles_.rbegin();
it++;
auto expected = it->second->lastId();
wal.reset();

auto fd = open(lastWalPath.c_str(), O_WRONLY | O_APPEND);
ftruncate(fd, sizeof(LogID) + sizeof(TermID));
close(fd);

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(),
"",
policy,
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(expected, wal->lastLogId());
wal.reset();

// truncate last wal
fd = open(lastWalPath.c_str(), O_WRONLY | O_APPEND);
ftruncate(fd, 0);
close(fd);

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(),
"",
policy,
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(expected, wal->lastLogId());

// Append more log, and reset to 1000, the last wal should be an empty file
for (int i = expected + 1; i <= expected + 1000; i++) {
EXPECT_TRUE(wal->appendLog(i /*id*/, 1 /*term*/, 0 /*cluster*/,
folly::stringPrintf(kLongMsg, i)));
}
EXPECT_EQ(expected + 1000, wal->lastLogId());

wal->rollbackToLog(1000);
ASSERT_EQ(1000, wal->lastLogId());
wal.reset();

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(),
"",
policy,
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(1000, wal->lastLogId());
}
}


} // namespace wal
} // namespace nebula

Expand Down

0 comments on commit 75326df

Please sign in to comment.