Skip to content

Commit

Permalink
remove StopSync state (#634)
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed Jun 13, 2019
1 parent 2591f83 commit 3e0b21e
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 14 deletions.
3 changes: 1 addition & 2 deletions include/pika_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ enum ReplState {
kWaitDBSync = 3,
kWaitReply = 4,
kConnected = 5,
kStopSync = 6,
kError = 7
kError = 6
};

// debug only
Expand Down
6 changes: 3 additions & 3 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ void DbSlaveofCmd::Do(std::shared_ptr<Partition> partition) {
Status s;
if (is_noone_) {
if (db_partition->State() == ReplState::kConnected) {
db_partition->SetReplState(ReplState::kStopSync);
db_partition->SetReplState(ReplState::kNoConnect);
s = g_pika_server->SendRemoveSlaveNodeRequest(
db_partition->GetTableName(), db_partition->GetPartitionId());
}
Expand Down Expand Up @@ -731,8 +731,8 @@ void InfoCmd::InfoReplication(std::string& info) {
if (patition_item.second->State() != ReplState::kConnected) {
all_partition_sync = false;
out_of_sync << "(" << patition_item.second->GetPartitionName() << ":";
if (patition_item.second->State() == ReplState::kStopSync) {
out_of_sync << "Stop)";
if (patition_item.second->State() == ReplState::kNoConnect) {
out_of_sync << "NoConnect)";
} else if (patition_item.second->State() == ReplState::kWaitDBSync) {
out_of_sync << "WaitDBSync)";
} else if (patition_item.second->State() == ReplState::kError) {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_auxiliary_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void PikaAuxiliaryThread::RunEveryPartitionStateMachine() {
} else if (partition->State() == ReplState::kWaitDBSync) {
partition->TryUpdateMasterOffset();
} else if (partition->State() == ReplState::kConnected
|| partition->State() == ReplState::kStopSync) {
|| partition->State() == ReplState::kNoConnect) {
count++;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/pika_inner_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ message InnerResponse {
}

message RemoveSlaveNode {
required Partition partition = 1;
required Node node = 1;
required Partition partition = 2;
}

required Type type = 1;
Expand Down
21 changes: 17 additions & 4 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
g_pika_server->master_port(), table_name, partition_id, session_id));

std::string partition_name = partition->GetPartitionName();
partition->SetReplState(ReplState::kWaitDBSync);
LOG(INFO) << "Partition: " << partition_name << " Need Wait To Sync";
delete task_arg;
}
Expand Down Expand Up @@ -237,17 +238,29 @@ void PikaReplClientConn::HandleRemoveSlaveNodeResponse(void* arg) {
std::shared_ptr<pink::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;
const InnerMessage::InnerResponse_RemoveSlaveNode remove_slave_node_response = response->remove_slave_node();
const InnerMessage::Partition partition_response = remove_slave_node_response.partition();
const InnerMessage::Partition partition_res = remove_slave_node_response.partition();
const InnerMessage::Node node_res = remove_slave_node_response.node();

if (response->code() != InnerMessage::kOk) {
std::string reply = response->has_reply() ? response->reply() : "";
LOG(WARNING) << "Remove slave node Failed: " << reply;
delete task_arg;
return;
}
LOG(INFO) << "Master remove slave node success"
<< ", table name:" << partition_response.table_name()
<< ", partition id:" << partition_response.partition_id();
Status s = g_pika_rm->RemoveSyncSlavePartition(RmNode(node_res.ip(),
node_res.port(), partition_res.table_name(), partition_res.partition_id()));
if (s.ok()) {
LOG(INFO) << "Master remove slave node success"
<< ", ip_port:" << node_res.ip() << ":" << node_res.port()
<< ", table name:" << partition_res.table_name()
<< ", partition id:" << partition_res.partition_id();
} else {
LOG(WARNING) << "Master remove slave node failed"
<< ", ip_port:" << node_res.ip() << ":" << node_res.port()
<< ", table name:" << partition_res.table_name()
<< ", partition id:" << partition_res.partition_id()
<< ", " << s.ToString();
}
delete task_arg;
}

3 changes: 3 additions & 0 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) {
InnerMessage::Partition* partition_response = remove_slave_node_response->mutable_partition();
partition_response->set_table_name(table_name);
partition_response->set_partition_id(partition_id);
InnerMessage::Node* node_response = remove_slave_node_response->mutable_node();
node_response->set_ip(g_pika_server->host());
node_response->set_port(g_pika_server->port());

std::string reply_str;
if (!response.SerializeToString(&reply_str)
Expand Down
3 changes: 1 addition & 2 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,7 @@ Status SyncSlavePartition::CheckSyncTimeout(uint64_t now, bool* del) {
LOG(WARNING) << "Partition: " << m_info_.TableName()
<< ":" << m_info_.PartitionId() << " Not Found";
} else {
if (partition->State() != ReplState::kStopSync
&& m_info_.LastRecvTime() + kRecvKeepAliveTimeout < now) {
if (m_info_.LastRecvTime() + kRecvKeepAliveTimeout < now) {
partition->SetReplState(ReplState::kTryConnect);
g_pika_server->SetLoopPartitionStateMachine(true);
*del = true;
Expand Down
1 change: 1 addition & 0 deletions src/pika_rsync_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ int PikaRsyncService::StartRsync() {
ret = CreateSecretFile();
if (ret != 0) {
LOG(WARNING) << "Failed to create secret file";
return -1;
}
// Make sure the listening addr of rsyncd is accessible, avoid the corner case
// that rsync --daemon process is started but not finished listening on the socket
Expand Down
2 changes: 1 addition & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ Status PikaServer::SendPartitionDBSyncRequest(std::shared_ptr<Partition> partiti
std::string table_name = partition->GetTableName();
uint32_t partition_id = partition->GetPartitionId();
Status status = g_pika_rm->GetPikaReplClient()->SendPartitionDBSync(table_name, partition_id, boffset);
partition->SetReplState(ReplState::kWaitDBSync);
partition->SetReplState(ReplState::kWaitReply);
return status;
}

Expand Down

0 comments on commit 3e0b21e

Please sign in to comment.