Skip to content

Commit

Permalink
full synchronization between master and slave (#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed May 15, 2019
1 parent 263ef95 commit 5e4c21c
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 38 deletions.
9 changes: 9 additions & 0 deletions include/pika_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ class Partition : public std::enable_shared_from_this<Partition> {
ReplState State();
void MarkTryConnectState();
void MarkWaitReplyState();
void MarkWaitDBSyncState();
bool FullSync();
void SetFullSync(bool full_sync);

void PrepareRsync();
bool TryUpdateMasterOffset();
bool ChangeDb(const std::string& new_path);

void Leave();
void Close();
Expand Down Expand Up @@ -116,6 +123,7 @@ class Partition : public std::enable_shared_from_this<Partition> {
std::string log_path_;
std::string trash_path_;
std::string bgsave_sub_path_;
std::string dbsync_path_;
std::string partition_name_;

bool opened_;
Expand All @@ -128,6 +136,7 @@ class Partition : public std::enable_shared_from_this<Partition> {

pthread_rwlock_t state_rwlock_; // protect partition status below
ReplState repl_state_;
bool full_sync_;


/*
Expand Down
3 changes: 2 additions & 1 deletion include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class PikaReplClient {
Status SendMetaSync();
Status SendPartitionTrySync(const std::string& table_name,
uint32_t partition_id,
const BinlogOffset& boffset);
const BinlogOffset& boffset,
bool force);
Status SendBinlogSync(const RmNode& slave);

Status TriggerSendBinlogSync();
Expand Down
2 changes: 1 addition & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ enum TaskType {
kStartKeyScan,
kStopKeyScan,
kBgSave,
kMarkTryConnectState
};

class PikaServer {
Expand Down Expand Up @@ -183,6 +182,7 @@ class PikaServer {
bool GetTablePartitionBinlogOffset(const std::string& table_name,
uint32_t partition_id,
BinlogOffset* const boffset);
void PreparePartitionTrySync();
std::shared_ptr<Partition> GetTablePartitionById(
const std::string& table_name,
uint32_t partition_id);
Expand Down
3 changes: 0 additions & 3 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ void SlaveofCmd::Do(std::shared_ptr<Partition> partition) {
return;
}

// Stop rsync
LOG(INFO) << "Start slaveof, stop rsync first";
slash::StopRsync(g_pika_conf->db_sync_path());
g_pika_server->RemoveMaster();

if (is_noone_) {
Expand Down
19 changes: 8 additions & 11 deletions src/pika_auxiliary_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ void* PikaAuxiliaryThread::ThreadMain() {
g_pika_server->SendMetaSyncRequest();
LOG(INFO) << "Send meta sync request finish";
continue;
}
if (g_pika_server->ShouldMarkTryConnect()) {
g_pika_server->DoSameThingEveryPartition(TaskType::kMarkTryConnectState);
g_pika_server->MarkTryConnectDone();
LOG(INFO) << "mark try connect finish";
} else if (g_pika_server->ShouldMarkTryConnect()) {
g_pika_server->PreparePartitionTrySync();
LOG(INFO) << "Mark try connect finish";
continue;
}
if (g_pika_server->ShouldTrySyncPartition()) {
} else if (g_pika_server->ShouldTrySyncPartition()) {
RunEveryPartitionStateMachine();
}
// TODO(whoiami) timeout
Expand Down Expand Up @@ -61,13 +58,13 @@ void PikaAuxiliaryThread::RunEveryPartitionStateMachine() {
<< table.table_name << " Partition Id: " << idx;
continue;
}
if (partition->State() == ReplState::kWaitReply
|| partition->State() == ReplState::kConnected) {
continue;
}
if (partition->State() == ReplState::kTryConnect) {
g_pika_server->SendPartitionTrySyncRequest(partition);
} else if (partition->State() == ReplState::kWaitReply) {
continue;
} else if (partition->State() == ReplState::kWaitDBSync) {
partition->TryUpdateMasterOffset();
} else if (partition->State() == ReplState::kConnected) {
}
}
}
Expand Down
156 changes: 156 additions & 0 deletions src/pika_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ std::string BgsaveSubPath(const std::string& table_name,
return std::string(buf);
}

std::string DbSyncPath(const std::string& sync_path,
const std::string& table_name,
const uint32_t partition_id,
bool classic_mode) {
char buf[256];
std::string partition_id_str = std::to_string(partition_id);
if (classic_mode) {
snprintf(buf, sizeof(buf), "%s/", table_name.data());
} else {
snprintf(buf, sizeof(buf), "%s/%s/", table_name.data(), partition_id_str.data());
}
return sync_path + buf;
}

Partition::Partition(const std::string& table_name,
uint32_t partition_id,
const std::string& table_db_path,
Expand All @@ -44,6 +58,7 @@ Partition::Partition(const std::string& table_name,
partition_id_(partition_id),
binlog_io_error_(false),
repl_state_(ReplState::kNoConnect),
full_sync_(false),
bgsave_engine_(NULL),
purging_(false) {

Expand All @@ -55,6 +70,8 @@ Partition::Partition(const std::string& table_name,
table_trash_path : PartitionPath(table_trash_path, partition_id_);
bgsave_sub_path_ = g_pika_conf->classic_mode() ?
table_name : BgsaveSubPath(table_name_, partition_id_);
dbsync_path_ = DbSyncPath(g_pika_conf->db_sync_path(), table_name_,
partition_id_, g_pika_conf->classic_mode());
partition_name_ = g_pika_conf->classic_mode() ?
table_name : PartitionName(table_name_, partition_id_);

Expand Down Expand Up @@ -263,6 +280,145 @@ void Partition::MarkWaitReplyState() {
repl_state_ = ReplState::kWaitReply;
}

void Partition::MarkWaitDBSyncState() {
slash::RWLock rwl(&state_rwlock_, true);
repl_state_ = ReplState::kWaitDBSync;
}

bool Partition::FullSync() {
return full_sync_;
}

void Partition::SetFullSync(bool full_sync) {
full_sync_ = full_sync;
}

void Partition::PrepareRsync() {
slash::DeleteDirIfExist(dbsync_path_);
slash::CreatePath(dbsync_path_ + "strings");
slash::CreatePath(dbsync_path_ + "hashes");
slash::CreatePath(dbsync_path_ + "lists");
slash::CreatePath(dbsync_path_ + "sets");
slash::CreatePath(dbsync_path_ + "zsets");
}

// Try to update master offset
// This may happend when dbsync from master finished
// Here we do:
// 1, Check dbsync finished, got the new binlog offset
// 2, Replace the old db
// 3, Update master offset, and the PikaAuxiliaryThread cron will connect and do slaveof task with master
bool Partition::TryUpdateMasterOffset() {
std::string info_path = dbsync_path_ + kBgsaveInfoFile;
if (!slash::FileExists(info_path)) {
return false;
}

// Got new binlog offset
std::ifstream is(info_path);
if (!is) {
LOG(WARNING) << "Partition: " << partition_name_
<< ", Failed to open info file after db sync";
return false;
}
std::string line, master_ip;
int lineno = 0;
int64_t filenum = 0, offset = 0, tmp = 0, master_port = 0;
while (std::getline(is, line)) {
lineno++;
if (lineno == 2) {
master_ip = line;
} else if (lineno > 2 && lineno < 6) {
if (!slash::string2l(line.data(), line.size(), &tmp) || tmp < 0) {
LOG(WARNING) << "Partition: " << partition_name_
<< ", Format of info file after db sync error, line : " << line;
is.close();
return false;
}
if (lineno == 3) { master_port = tmp; }
else if (lineno == 4) { filenum = tmp; }
else { offset = tmp; }

} else if (lineno > 5) {
LOG(WARNING) << "Partition: " << partition_name_
<< ", Format of info file after db sync error, line : " << line;
is.close();
return false;
}
}
is.close();

LOG(INFO) << "Partition: " << partition_name_ << "Information from dbsync info"
<< ", master_ip: " << master_ip
<< ", master_port: " << master_port
<< ", filenum: " << filenum
<< ", offset: " << offset;

// Sanity check
if (master_ip != g_pika_server->master_ip() ||
master_port != g_pika_server->master_port()) {
LOG(WARNING) << "Partition: " << partition_name_
<< "Error master ip port: " << master_ip << ":" << master_port;
return false;
}

slash::DeleteFile(info_path);
if (!ChangeDb(dbsync_path_)) {
LOG(WARNING) << "Partition: " << partition_name_
<< ", Failed to change db";
return false;
}

// Update master offset
logger_->SetProducerStatus(filenum, offset);
full_sync_ = false;
MarkTryConnectState();
return true;
}

/*
* Change a new db locate in new_path
* return true when change success
* db remain the old one if return false
*/
bool Partition::ChangeDb(const std::string& new_path) {

blackwidow::BlackwidowOptions bw_option;
RocksdbOptionInit(&bw_option);

std::string tmp_path(db_path_);
if (tmp_path.back() == '/') {
tmp_path.resize(tmp_path.size() - 1);
}
tmp_path += "_bak";
slash::DeleteDirIfExist(tmp_path);

RWLock l(&db_rwlock_, true);
LOG(INFO) << "Partition: "<< partition_name_
<< ", Prepare change db from: " << tmp_path;
db_.reset();

if (0 != slash::RenameFile(db_path_.c_str(), tmp_path)) {
LOG(WARNING) << "Partition: " << partition_name_
<< ", Failed to rename db path when change db, error: " << strerror(errno);
return false;
}

if (0 != slash::RenameFile(new_path.c_str(), db_path_.c_str())) {
LOG(WARNING) << "Partition: " << partition_name_
<< ", Failed to rename new db path when change db, error: " << strerror(errno);
return false;
}

db_.reset(new blackwidow::BlackWidow());
rocksdb::Status s = db_->Open(bw_option, db_path_);
assert(db_);
assert(s.ok());
slash::DeleteDirIfExist(tmp_path);
LOG(INFO) << "Partition:" << partition_name_ << ", Change db success";
return true;
}

bool Partition::IsBgSaving() {
slash::MutexLock ml(&bgsave_protector_);
return bgsave_info_.bgsaving;
Expand Down
9 changes: 7 additions & 2 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ void PikaReplBgWorker::HandleTrySyncRequest(void* arg) {
InnerMessage::Node node = try_sync_request.node();
LOG(INFO) << "Trysync, Slave ip: " << node.ip() << ", Slave port:"
<< node.port() << ", Partition: " << partition_name << ", filenum: "
<< slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset();
<< slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset()
<< ", force: " << (force ? "yes" : "no");

InnerMessage::InnerResponse response;
response.set_type(InnerMessage::Type::kTrySync);
Expand All @@ -284,6 +285,9 @@ void PikaReplBgWorker::HandleTrySyncRequest(void* arg) {
partition_response->set_table_name(table_name);
partition_response->set_partition_id(partition_id);
if (force) {
LOG(INFO) << "Partition: " << partition_name << " force full sync, BgSave and DbSync first";
g_pika_server->TryDBSync(node.ip(), node.port() + 4000, table_name, partition_id, slave_boffset.filenum());
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kWait);
} else {
BinlogOffset boffset;
if (!g_pika_server->GetTablePartitionBinlogOffset(table_name, partition_id, &boffset)) {
Expand All @@ -297,7 +301,8 @@ void PikaReplBgWorker::HandleTrySyncRequest(void* arg) {
LOG(WARNING) << "Slave offset is larger than mine, Slave ip: "
<< node.ip() << ", Slave port: " << node.port() << ", Partition: "
<< partition_name << ", filenum: " << slave_boffset.filenum()
<< ", pro_offset_: " << slave_boffset.offset();
<< ", pro_offset_: " << slave_boffset.offset() << ", force: "
<< (force ? "yes" : "no");
} else {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk);
try_sync_response->set_sid(0);
Expand Down
4 changes: 2 additions & 2 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ Status PikaReplClient::SendMetaSync() {

Status PikaReplClient::SendPartitionTrySync(const std::string& table_name,
uint32_t partition_id,
const BinlogOffset& boffset) {
const BinlogOffset& boffset,
bool force_sync) {
InnerMessage::InnerRequest request;
request.set_type(InnerMessage::kTrySync);
InnerMessage::InnerRequest::TrySync* try_sync = request.mutable_try_sync();
Expand All @@ -203,7 +204,6 @@ Status PikaReplClient::SendPartitionTrySync(const std::string& table_name,
partition->set_table_name(table_name);
partition->set_partition_id(partition_id);

bool force_sync = g_pika_server->force_full_sync();
try_sync->set_force(force_sync);
InnerMessage::BinlogOffset* binlog_offset = try_sync->mutable_binlog_offset();
binlog_offset->set_filenum(force_sync ? 0 : boffset.filenum);
Expand Down
19 changes: 13 additions & 6 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,23 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
const InnerMessage::Partition partition_response = try_sync_response.partition();
std::string table_name = partition_response.table_name();
uint32_t partition_id = partition_response.partition_id();
std::string partition_name = table_name + "_" + std::to_string(partition_id);
std::shared_ptr<Partition> partition = g_pika_server->GetTablePartitionById(table_name, partition_id);
if (!partition) {
LOG(WARNING) << "Partition: " << table_name << ":" << partition_id << " Not Found";
delete resp_arg;
return;
}

if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kError) {
LOG(WARNING) << "Partition: " << partition_name << " TrySync Error";
std::string partition_name = partition->GetPartitionName();
if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kOk) {
LOG(INFO) << "Partition: " << partition_name << " TrySync Ok";
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kWait) {
LOG(WARNING) << "Partition: " << partition_name << " Need wait to sync";
partition->MarkWaitDBSyncState();
LOG(INFO) << "Partition: " << partition_name << " Need Wait To Sync";
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kInvalidOffset) {
LOG(WARNING) << "Partition: " << partition_name << " TrySync Error, Because the invalid filenum and offset";
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kOk) {
LOG(INFO) << "Partition: " << partition_name << " TrySync Ok";
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kError) {
LOG(WARNING) << "Partition: " << partition_name << " TrySync Error";
}
delete resp_arg;
}
Expand Down
Loading

0 comments on commit 5e4c21c

Please sign in to comment.