Skip to content

Commit

Permalink
master prepare for DbSyncSendFile (#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed May 15, 2019
1 parent 8643f48 commit 08636e4
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 14 deletions.
12 changes: 6 additions & 6 deletions include/pika_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

class Cmd;

struct BGSaveInfo {
struct BgSaveInfo {
bool bgsaving;
time_t start_time;
std::string s_start_time;
std::string path;
uint32_t filenum;
uint64_t offset;
BGSaveInfo() : bgsaving(false), filenum(0), offset(0){}
BgSaveInfo() : bgsaving(false), filenum(0), offset(0) {}
void Clear() {
bgsaving = false;
path.clear();
Expand All @@ -42,8 +42,8 @@ enum ReplState {
kNoConnect = 0,
kTryConnect = 1,
kWaitReply = 2,
kConnected = 3,
kWaitDBSync = 4,
kWaitDBSync = 3,
kConnected = 4,
};

// debug only
Expand Down Expand Up @@ -95,7 +95,7 @@ class Partition : public std::enable_shared_from_this<Partition> {
// BgSave use;
bool IsBgSaving();
void BgSavePartition();
BGSaveInfo bgsave_info();
BgSaveInfo bgsave_info();

// Flushall & Flushdb use
bool FlushAll();
Expand Down Expand Up @@ -139,7 +139,7 @@ class Partition : public std::enable_shared_from_this<Partition> {
bool InitBgsaveEngine();
void ClearBgsave();
void FinishBgsave();
BGSaveInfo bgsave_info_;
BgSaveInfo bgsave_info_;
slash::Mutex bgsave_protector_;
blackwidow::BackupEngine* bgsave_engine_;

Expand Down
31 changes: 31 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,35 @@ class PikaServer {
}
slash::Mutex & GetSlavesMutex() { return db_sync_protector_; }


struct NewDBSyncArg {
PikaServer* p;
std::string ip;
int port;
std::string table_name;
uint32_t partition_id;
NewDBSyncArg(PikaServer* const _p,
const std::string& _ip,
int _port,
const std::string& _table_name,
uint32_t _partition_id)
: p(_p), ip(_ip), port(_port),
table_name(_table_name), partition_id(_partition_id) {}
};
void TryDBSync(const std::string& ip, int port,
const std::string& table_name,
uint32_t partition_id, int32_t top);
std::string DbSyncTaskIndex(const std::string& ip, int port,
const std::string& table_name,
uint32_t partition_id);
void DBSync(const std::string& ip, int port,
const std::string& table_name,
uint32_t partition_id);
void NewDbSyncSendFile(const std::string& ip, int port,
const std::string& table_name,
uint32_t partition_id);


//flushall & flushdb
void PurgeDir(const std::string& path);
void PurgeDirTaskSchedule(void (*function)(void*), void* arg);
Expand Down Expand Up @@ -533,6 +562,8 @@ class PikaServer {
void DBSync(const std::string& ip, int port);
static void DoDBSync(void* arg);

static void NewDoDbSync(void* arg);

/*
* Keyscan use
*/
Expand Down
1 change: 0 additions & 1 deletion src/pika_inner_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ enum Type {
kMetaSync = 1;
kTrySync = 2;
kBinlogSync = 3;
kDbSync = 4;
}

enum StatusCode {
Expand Down
6 changes: 3 additions & 3 deletions src/pika_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ void Partition::BgSavePartition() {
g_pika_server->BGSaveTaskSchedule(&DoBgSave, static_cast<void*>(bg_task_arg));
}

BGSaveInfo Partition::bgsave_info() {
BgSaveInfo Partition::bgsave_info() {
slash::MutexLock l(&bgsave_protector_);
return bgsave_info_;
}
Expand All @@ -291,7 +291,7 @@ void Partition::DoBgSave(void* arg) {
bool success = bg_task_arg->partition->RunBgsaveEngine();

// Some output
BGSaveInfo info = bg_task_arg->partition->bgsave_info();
BgSaveInfo info = bg_task_arg->partition->bgsave_info();
std::ofstream out;
out.open(info.path + "/" + kBgsaveInfoFile, std::ios::in | std::ios::trunc);
if (out.is_open()) {
Expand Down Expand Up @@ -319,7 +319,7 @@ bool Partition::RunBgsaveEngine() {
}
LOG(INFO) << partition_name_ << " after prepare bgsave";

BGSaveInfo info = bgsave_info();
BgSaveInfo info = bgsave_info();
LOG(INFO) << partition_name_ << " bgsave_info: path=" << info.path
<< ", filenum=" << info.filenum
<< ", offset=" << info.offset;
Expand Down
12 changes: 8 additions & 4 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ int PikaReplServerConn::DealMessage() {
break;
case InnerMessage::kBinlogSync:
res = HandleBinlogSync(req);
case InnerMessage::kDbSync:
break;
default:
break;
}
Expand Down Expand Up @@ -88,7 +86,8 @@ int PikaReplServerConn::HandleTrySync(const InnerMessage::InnerRequest& req) {
InnerMessage::Node node = try_sync_request.node();
LOG(INFO) << "Trysync, Slave ip: " << node.ip() << ", Slave port:"
<< node.port() << ", Partition: " << partition_name << ", filenum: "
<< slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset();
<< slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset()
<< ", force: " << (force ? "yes" : "no");

InnerMessage::InnerResponse response;
response.set_type(InnerMessage::Type::kTrySync);
Expand All @@ -98,6 +97,9 @@ int PikaReplServerConn::HandleTrySync(const InnerMessage::InnerRequest& req) {
partition_response->set_table_name(table_name);
partition_response->set_partition_id(partition_id);
if (force) {
LOG(INFO) << "Partition: " << partition_name << " force full sync, BgSave and DbSync first";
g_pika_server->TryDBSync(node.ip(), node.port(), table_name, partition_id, slave_boffset.filenum());
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kWait);
} else {
BinlogOffset boffset;
if (!g_pika_server->GetTablePartitionBinlogOffset(table_name, partition_id, &boffset)) {
Expand All @@ -111,8 +113,10 @@ int PikaReplServerConn::HandleTrySync(const InnerMessage::InnerRequest& req) {
LOG(WARNING) << "Slave offset is larger than mine, Slave ip: "
<< node.ip() << ", Slave port: " << node.port() << ", Partition: "
<< partition_name << ", filenum: " << slave_boffset.filenum()
<< ", pro_offset_: " << slave_boffset.offset();
<< ", pro_offset_: " << slave_boffset.offset() << ", force: "
<< (force ? "yes" : "no");
} else {
LOG(INFO) << "Partition: " << partition_name << " TrySync success";
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk);
try_sync_response->set_sid(0);
}
Expand Down
63 changes: 63 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,69 @@ bool PikaServer::ChangeDb(const std::string& new_path) {
return true;
}

void PikaServer::TryDBSync(const std::string& ip, int port,
const std::string& table_name,
uint32_t partition_id, int32_t top) {
std::shared_ptr<Partition> partition =
GetTablePartitionById(table_name, partition_id);
if (!partition) {
LOG(WARNING) << "Partition: " << partition->GetPartitionName()
<< " Not Found, TryDBSync Failed";
} else {
BgSaveInfo bgsave_info = partition->bgsave_info();
std::string logger_filename = partition->logger()->filename;
if (slash::IsDir(bgsave_info.path) != 0
|| !slash::FileExists(NewFileName(logger_filename, bgsave_info.filenum))
|| top - bgsave_info.filenum > kDBSyncMaxGap) {
// Need Bgsave first
partition->BgSavePartition();
}
DBSync(ip, port, table_name, partition_id);
}
}

std::string PikaServer::DbSyncTaskIndex(const std::string& ip,
int port,
const std::string& table_name,
uint32_t partition_id) {
char buf[256];
snprintf(buf, sizeof(buf), "%s:%d_%s:%d",
ip.data(), port, table_name.data(), partition_id);
return buf;
}

void PikaServer::DBSync(const std::string& ip, int port,
const std::string& table_name,
uint32_t partition_id) {
{
std::string task_index =
DbSyncTaskIndex(ip, port, table_name, partition_id);
slash::MutexLock ml(&db_sync_protector_);
if (db_sync_slaves_.find(task_index) != db_sync_slaves_.end()) {
return;
}
db_sync_slaves_.insert(task_index);
}
// Reuse the bgsave_thread_
// Since we expect BgSave and DBSync execute serially
bgsave_thread_.StartThread();
NewDBSyncArg* arg = new NewDBSyncArg(this, ip, port, table_name, partition_id);
bgsave_thread_.Schedule(&NewDoDbSync, reinterpret_cast<void*>(arg));
}

void PikaServer::NewDoDbSync(void* arg) {
NewDBSyncArg* dbsa = reinterpret_cast<NewDBSyncArg*>(arg);
PikaServer* const ps = dbsa->p;
ps->NewDbSyncSendFile(dbsa->ip, dbsa->port,
dbsa->table_name, dbsa->partition_id);
delete dbsa;
}

void PikaServer::NewDbSyncSendFile(const std::string& ip, int port,
const std::string& table_name,
uint32_t partition_id) {
}

void PikaServer::MayUpdateSlavesMap(int64_t sid, int32_t hb_fd) {
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin();
Expand Down

0 comments on commit 08636e4

Please sign in to comment.