Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbslaveof support force option #651

Merged
merged 1 commit into from
Jun 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
dbslaveof support force option
  • Loading branch information
Axlgrep committed Jun 18, 2019
commit 8759e7d661a1992c0eb35ffdf74f3207d8faff98
4 changes: 4 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,17 @@ class DbSlaveofCmd : public Cmd {

private:
std::string db_name_;
bool force_sync_;
bool is_noone_;
bool have_offset_;
int64_t filenum_;
int64_t offset_;
virtual void DoInitial() override;
virtual void Clear() {
db_name_.clear();
force_sync_ = false;
is_noone_ = false;
have_offset_ = false;
}
};

Expand Down
5 changes: 0 additions & 5 deletions include/pika_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ class Partition : public std::enable_shared_from_this<Partition> {
ReplState State();
void SetReplState(const ReplState& state);

bool FullSync();
void SetFullSync(bool full_sync);

void PrepareRsync();
bool TryUpdateMasterOffset();
bool ChangeDb(const std::string& new_path);
Expand Down Expand Up @@ -125,8 +122,6 @@ class Partition : public std::enable_shared_from_this<Partition> {
slash::RecordMutex mutex_record_;
std::shared_ptr<blackwidow::BlackWidow> db_;

bool full_sync_;

/*
* BgSave use
*/
Expand Down
43 changes: 30 additions & 13 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ void SlaveofCmd::Do(std::shared_ptr<Partition> partition) {
}

/*
* dbslaveof db[0 ~ 7]
* dbslaveof db[0 ~ 7] force
* dbslaveof db[0 ~ 7] no one
* dbslaveof db[0 ~ 7] filenum offset
*/
Expand All @@ -121,19 +123,28 @@ void DbSlaveofCmd::DoInitial() {
return;
}

if (!strcasecmp(argv_[2].data(), "no")
&& !strcasecmp(argv_[3].data(), "one")) {
is_noone_ = true;
if (argv_.size() == 3
&& !strcasecmp(argv_[2].data(), "force")) {
force_sync_ = true;
return;
}

if (!slash::string2l(argv_[2].data(), argv_[2].size(), &filenum_) || filenum_ < 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
if (!slash::string2l(argv_[3].data(), argv_[3].size(), &offset_) || offset_ < 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
if (argv_.size() == 4) {
if (!strcasecmp(argv_[2].data(), "no")
&& !strcasecmp(argv_[3].data(), "one")) {
is_noone_ = true;
return;
}

if (!slash::string2l(argv_[2].data(), argv_[2].size(), &filenum_) || filenum_ < 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
if (!slash::string2l(argv_[3].data(), argv_[3].size(), &offset_) || offset_ < 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
have_offset_ = true;
}
}

Expand Down Expand Up @@ -161,9 +172,15 @@ void DbSlaveofCmd::Do(std::shared_ptr<Partition> partition) {
}
} else {
if (slave_partition->State() == ReplState::kNoConnect
|| slave_partition->State() == ReplState::kError) {
db_partition->logger()->SetProducerStatus(filenum_, offset_);
slave_partition->SetReplState(ReplState::kTryConnect);
|| slave_partition->State() == ReplState::kError) {
if (force_sync_) {
slave_partition->SetReplState(ReplState::kTryDBSync);
} else {
if (have_offset_) {
db_partition->logger()->SetProducerStatus(filenum_, offset_);
}
slave_partition->SetReplState(ReplState::kTryConnect);
}
g_pika_server->SetLoopPartitionStateMachine(true);
}
}
Expand Down
8 changes: 1 addition & 7 deletions src/pika_auxiliary_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,7 @@ void PikaAuxiliaryThread::RunEveryPartitionStateMachine() {
continue;
}
if (slave_partition->State() == ReplState::kTryConnect) {
// If partition need to FullSync, we Send DBSync Request
// directly, instead of TrySync first
if (partition->FullSync()) {
g_pika_server->SendPartitionDBSyncRequest(partition);
} else {
g_pika_server->SendPartitionTrySyncRequest(partition);
}
g_pika_server->SendPartitionTrySyncRequest(partition);
} else if (slave_partition->State() == ReplState::kTryDBSync) {
g_pika_server->SendPartitionDBSyncRequest(partition);
} else if (slave_partition->State() == ReplState::kWaitReply) {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
////Slaveof
Cmd* slaveofptr = new SlaveofCmd(kCmdNameSlaveof, -3, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlaveof, slaveofptr));
Cmd* dbslaveofptr = new DbSlaveofCmd(kCmdNameDbSlaveof, 4, kCmdFlagsRead | kCmdFlagsAdmin);
Cmd* dbslaveofptr = new DbSlaveofCmd(kCmdNameDbSlaveof, -2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameDbSlaveof, dbslaveofptr));
Cmd* authptr = new AuthCmd(kCmdNameAuth, 2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameAuth, authptr));
Expand Down
10 changes: 0 additions & 10 deletions src/pika_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ Partition::Partition(const std::string& table_name,
table_name_(table_name),
partition_id_(partition_id),
binlog_io_error_(false),
full_sync_(false),
bgsave_engine_(NULL),
purging_(false) {

Expand Down Expand Up @@ -260,14 +259,6 @@ bool Partition::SetBinlogOffset(const BinlogOffset& boffset) {
return false;
}

bool Partition::FullSync() {
return full_sync_;
}

void Partition::SetFullSync(bool full_sync) {
full_sync_ = full_sync;
}

void Partition::PrepareRsync() {
slash::DeleteDirIfExist(dbsync_path_);
slash::CreatePath(dbsync_path_ + "strings");
Expand Down Expand Up @@ -358,7 +349,6 @@ bool Partition::TryUpdateMasterOffset() {

// Update master offset
logger_->SetProducerStatus(filenum, offset);
full_sync_ = false;
slave_partition->SetReplState(ReplState::kTryConnect);
return true;
}
Expand Down
8 changes: 4 additions & 4 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,13 @@ Status PikaServer::DoSameThingSpecificTable(const TaskType& type, const std::set

void PikaServer::PreparePartitionTrySync() {
slash::RWLock rwl(&tables_rw_, false);
ReplState state = force_full_sync_ ?
ReplState::kTryDBSync : ReplState::kTryConnect;
for (const auto& table_item : tables_) {
for (const auto& partition_item : table_item.second->partitions_) {
partition_item.second->SetFullSync(force_full_sync_);
Status s = g_pika_rm->SetSlaveReplState(
RmNode(table_item.second->GetTableName(),
partition_item.second->GetPartitionId()),
ReplState::kTryConnect);
RmNode(table_item.second->GetTableName(),
partition_item.second->GetPartitionId()), state);
if (!s.ok()) {
LOG(WARNING) << s.ToString();
}
Expand Down