Skip to content

Commit

Permalink
Feature: sync all snapshot data in Pika process without rsync subproc…
Browse files Browse the repository at this point in the history
…ess (OpenAtomFoundation#1805)

* define rsync related header file and proto

* feat:add throttle (OpenAtomFoundation#167)

* add_throttle

* feat: implement rsync network tansform (OpenAtomFoundation#169)

implement rsync network transfer

Co-authored-by: wangshaoyi <wangshaoyi@meituan.com>

* fix action (OpenAtomFoundation#171)

* fix action

* fix

* feat: add load local meta file (OpenAtomFoundation#175)

* add load meta file

* [feat] add rsync client/server code (OpenAtomFoundation#177)

* implement rsync network transfer

* add rsyncclient to syncslaveslot (OpenAtomFoundation#182)

add rsyncclient to syncslaveslot

* feat: add read meta file and data (OpenAtomFoundation#179)

* add read meta file and data

* fix compile error (OpenAtomFoundation#183)

* fix bug

* add rsyncclient to syncslaveslot

* fix compile error

* fix compile error

---------

Co-authored-by: wangshaoyi <wangshaoyi@meituan.com>

* fix compile error (OpenAtomFoundation#184)

* fix bug

* optimize: add_throttle (OpenAtomFoundation#189)

optimize throttle

* rsyncclient periodically flush meta table (OpenAtomFoundation#192)

rsyncclient periodically flush meta table

* change rsync response (OpenAtomFoundation#190)

* change rsync response

* add debug log for test

* fix rsync client/server bugs

* fix bugs

* add debug log for test

* fix bugs

* fix bugs

* fix bugs

* rix rsync bugs (OpenAtomFoundation#194)

* fix pika rsync bug

* fix bugs

* fix bugs

* fix bugs 1

* fix bugs

* fix rsync bugs (OpenAtomFoundation#195)

* add debug log for test

* fix rsync client/server bugs

* fix bugs

* add debug log for test

* fix bugs

* fix bugs

* fix bugs

* fix bugs

* fix bugs

* fix bugs 1

* fix bugs

---------

Co-authored-by: wangshaoyi <wangshaoyi@meituan.com>

* remove unused code

* remove unused code

* remove unused code

* remove unused code

* add copyright

* fix by review comments (OpenAtomFoundation#213)

Co-authored-by: wangshaoyi <wangshaoyi@meituan.com>

* fix by review comments (OpenAtomFoundation#214)

* fix by review comments
Co-authored-by: wangshaoyi <wangshaoyi@meituan.com>

* fix by review comments (OpenAtomFoundation#216)

* fix by review comments
---------

Co-authored-by: wangshaoyi <wangshaoyi@meituan.com>

* Optimize rsync wangsy (OpenAtomFoundation#217)

* fix by review comments

Co-authored-by: wangshaoyi <wangshaoyi@meituan.com>

* fix by review comments (OpenAtomFoundation#218)

* fix by review comments
---------

Co-authored-by: wangshaoyi <wangshaoyi@meituan.com>

---------

Co-authored-by: wangshaoyi <wangshaoyi@meituan.com>
Co-authored-by: Yuecai Liu <38887641+luky116@users.noreply.github.com>
Co-authored-by: chejinge <945997690@qq.com>
Co-authored-by: luky116 <luky116@126.com>
  • Loading branch information
5 people authored Jul 26, 2023
1 parent c319cdb commit 5c3e603
Show file tree
Hide file tree
Showing 22 changed files with 1,418 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/operator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:

- name: Build
run: |
cd tools/pika_operator && make
cd tools/pika_operator && make
- name: Unit Test
run: |
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ set(PIKA_BUILD_VERSION_CC ${CMAKE_BINARY_DIR}/pika_build_version.cc)
message("PIKA_BUILD_VERSION_CC : " ${PIKA_BUILD_VERSION_CC})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/build_version.cc.in ${PIKA_BUILD_VERSION_CC} @ONLY)

set(PROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/src/pika_inner_message.proto)
set(PROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/src/pika_inner_message.proto ${CMAKE_CURRENT_SOURCE_DIR}/src/rsync_service.proto)
custom_protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${PROTO_FILES})
message("pika PROTO_SRCS = ${PROTO_SRCS}")
message("pika PROTO_HDRS = ${PROTO_HDRS}")
Expand Down
3 changes: 2 additions & 1 deletion include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class PikaServer;
/* Port shift */
const int kPortShiftRSync = 1000;
const int kPortShiftReplServer = 2000;

//TODO: Temporarily used for rsync server port shift. will be deleted.
const int kPortShiftRsync2 = 10001;
const std::string kPikaPidFile = "pika.pid";
const std::string kPikaSecretFile = "rsync.secret";
const std::string kDefaultRsyncAuth = "default";
Expand Down
6 changes: 6 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "include/pika_repl_server.h"
#include "include/pika_slave_node.h"
#include "include/pika_stable_log.h"
#include "include/rsync_client.h"

#define kBinlogSendPacketNum 40
#define kBinlogSendBatchNum 100
Expand Down Expand Up @@ -157,7 +158,12 @@ class SyncSlaveSlot : public SyncSlot {

std::string LocalIp();

void ActivateRsync();

bool IsRsyncRunning() {return rsync_cli_->IsRunning();}

private:
std::unique_ptr<rsync::RsyncClient> rsync_cli_;
pstd::Mutex slot_mu_;
RmNode m_info_;
ReplState repl_state_{kNoConnect};
Expand Down
4 changes: 4 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
#include "include/rsync_server.h"
#include "include/pika_statistic.h"
#include "include/pika_slot_command.h"
#include "include/pika_migrate_thread.h"
Expand Down Expand Up @@ -261,6 +262,8 @@ class PikaServer : public pstd::noncopyable {
/*
* DBSync used
*/
pstd::Status GetDumpUUID(const std::string& db_name, const uint32_t slot_id, std::string* snapshot_uuid);
pstd::Status GetDumpMeta(const std::string& db_name, const uint32_t slot_id, std::vector<std::string>* files, std::string* snapshot_uuid);
void DBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id);
void TryDBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id, int32_t top);
void DbSyncSendFile(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id);
Expand Down Expand Up @@ -587,6 +590,7 @@ class PikaServer : public pstd::noncopyable {
* Rsync used
*/
std::unique_ptr<PikaRsyncService> pika_rsync_service_;
std::unique_ptr<rsync::RsyncServer> rsync_server_;

/*
* Pubsub used
Expand Down
3 changes: 3 additions & 0 deletions include/pika_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
bool IsBgSaving();
void BgSaveSlot();
BgSaveInfo bgsave_info();
void GetBgSaveMetaData(std::vector<std::string>* fileNames, std::string* snapshot_uuid);
pstd::Status GetBgSaveUUID(std::string* snapshot_uuid);

// FlushDB & FlushSubDB use
bool FlushDB();
Expand All @@ -93,6 +95,7 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
private:
std::string db_name_;
uint32_t slot_id_ = 0;
std::string snapshot_uuid_;

std::string db_path_;
std::string bgsave_sub_path_;
Expand Down
152 changes: 152 additions & 0 deletions include/rsync_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef RSYNC_CLIENT_H_
#define RSYNC_CLIENT_H_

#include <glog/logging.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <list>
#include <atomic>
#include <memory>
#include <thread>
#include <condition_variable>

#include "net/include/bg_thread.h"
#include "net/include/net_cli.h"
#include "pstd/include/env.h"
#include "pstd/include/pstd_status.h"
#include "pstd/include/pstd_hash.h"
#include "pstd/include/pstd_string.h"
#include "pstd/include/pstd_status.h"
#include "include/rsync_client_thread.h"
#include "include/throttle.h"
#include "rsync_service.pb.h"

const std::string kDumpMetaFileName = "DUMP_META_DATA";
const std::string kUuidPrefix = "snapshot-uuid:";

namespace rsync {

class RsyncWriter;
class Session;
class WaitObject;

class RsyncClient : public net::Thread {
public:
enum State {
IDLE,
RUNNING,
STOP,
};
RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id);
void* ThreadMain() override;
bool Init();
Status Start();
Status Stop();
bool IsRunning() {
return state_.load() == RUNNING;
}
bool IsIdle() { return state_.load() == IDLE;}
void OnReceive(RsyncService::RsyncResponse* resp);

private:
bool Recover();
Status Wait(RsyncService::RsyncResponse*& resp);
Status CopyRemoteFile(const std::string& filename);
Status CopyRemoteMeta(std::string* snapshot_uuid, std::set<std::string>* file_set);
Status LoadLocalMeta(std::string* snapshot_uuid, std::map<std::string, std::string>* file_map);
std::string GetLocalMetaFilePath();
Status FlushMetaTable();
Status CleanUpExpiredFiles(bool need_reset_path, const std::set<std::string>& files);
Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set<std::string>& expired_files, std::map<std::string, std::string>* localFileMap);
void HandleRsyncMetaResponse(RsyncService::RsyncResponse* response);

private:
std::map<std::string, std::string> meta_table_;
int flush_period_ = 10;
std::set<std::string> file_set_;
std::string snapshot_uuid_;
std::string dir_;
std::string db_name_;
uint32_t slot_id_ = 0;
std::unique_ptr<RsyncClientThread> client_thread_;
std::atomic<State> state_;
int max_retries_ = 10;
std::unique_ptr<WaitObject> wo_;
std::condition_variable cond_;
std::mutex mu_;
std::unique_ptr<Throttle> throttle_;
std::string master_ip_;
int master_port_;
};

class RsyncWriter {
public:
RsyncWriter(const std::string& filepath) {
filepath_ = filepath;
fd_ = open(filepath.c_str(), O_RDWR | O_APPEND | O_CREAT, 0644);
}
~RsyncWriter() {}
Status Write(uint64_t offset, size_t n, const char* data) {
const char* ptr = data;
size_t left = n;
Status s;
while (left != 0) {
ssize_t done = write(fd_, ptr, left);
if (done < 0) {
if (errno == EINTR) {
continue;
}
LOG(WARNING) << "pwrite failed, filename: " << filepath_ << "errno: " << strerror(errno) << "n: " << n;
return Status::IOError(filepath_, "pwrite failed");
}
left -= done;
ptr += done;
offset += done;
}
return Status::OK();
}
Status Close() {
close(fd_);
return Status::OK();
}
Status Fsync() {
fsync(fd_);
return Status::OK();
}

private:
std::string filepath_;
int fd_ = -1;
};

class WaitObject {
public:
WaitObject() : filename_(""), type_(RsyncService::kRsyncMeta), offset_(0), resp_(nullptr) {}
~WaitObject() {}
void Reset(const std::string& filename, RsyncService::Type t, size_t offset) {
resp_ = nullptr;
filename_ = filename;
type_ = t;
offset_ = offset;
}
void Reset(RsyncService::Type t) {
resp_ = nullptr;
filename_ = "";
type_ = t;
offset_ = 0xFFFFFFFF;
}
std::string filename_;
RsyncService::Type type_;
size_t offset_ = 0xFFFFFFFF;
RsyncService::RsyncResponse* resp_ = nullptr;
};

} // end namespace rsync
#endif

55 changes: 55 additions & 0 deletions include/rsync_client_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef RSYNC_CLIENT_THREAD_H_
#define RSYNC_CLIENT_THREAD_H_

#include "net/include/client_thread.h"
#include "net/include/net_conn.h"
#include "net/include/pb_conn.h"
#include "rsync_service.pb.h"

using namespace pstd;
using namespace net;

namespace rsync {

class RsyncClientConn : public PbConn {
public:
RsyncClientConn(int fd, const std::string& ip_port,
net::Thread* thread, void* cb_handler,
NetMultiplexer* mpx);
~RsyncClientConn() override;
int DealMessage() override;

private:
void* cb_handler_ = nullptr;
};

class RsyncClientConnFactory : public ConnFactory {
public:
RsyncClientConnFactory(void* scheduler) : cb_handler_(scheduler) {}
std::shared_ptr<net::NetConn> NewNetConn(int connfd, const std::string& ip_port,
net::Thread* thread, void* cb_handler,
net::NetMultiplexer* net) const override {
return std::static_pointer_cast<net::NetConn>(
std::make_shared<RsyncClientConn>(connfd, ip_port, thread, cb_handler_, net));
}
private:
void* cb_handler_ = nullptr;
};

class RsyncClientThread : public ClientThread {
public:
RsyncClientThread(int cron_interval, int keepalive_timeout, void* scheduler);
~RsyncClientThread() override;
private:
RsyncClientConnFactory conn_factory_;
ClientHandle handle_;
};

} //end namespace rsync
#endif

91 changes: 91 additions & 0 deletions include/rsync_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef RSYNC_SERVER_H_
#define RSYNC_SERVER_H_

#include <stdio.h>
#include <unistd.h>

#include "net/include/net_conn.h"
#include "net/include/net_thread.h"
#include "net/include/pb_conn.h"
#include "net/include/server_thread.h"
#include "net/include/thread_pool.h"
#include "net/src/holy_thread.h"
#include "net/src/net_multiplexer.h"
#include "pstd/include/env.h"
#include "pstd_hash.h"
#include "rsync_service.pb.h"

namespace rsync {
struct RsyncServerTaskArg {
std::shared_ptr<RsyncService::RsyncRequest> req;
std::shared_ptr<net::PbConn> conn;
RsyncServerTaskArg(std::shared_ptr<RsyncService::RsyncRequest> _req, std::shared_ptr<net::PbConn> _conn)
: req(std::move(_req)), conn(std::move(_conn)) {}
};
class RsyncReader;
class RsyncServerThread;

class RsyncServer {
public:
RsyncServer(const std::set<std::string>& ips, const int port);
~RsyncServer();
void Schedule(net::TaskFunc func, void* arg);
int Start();
int Stop();
private:
std::unique_ptr<net::ThreadPool> work_thread_;
std::unique_ptr<RsyncServerThread> rsync_server_thread_;
};

class RsyncServerConn : public net::PbConn {
public:
RsyncServerConn(int connfd, const std::string& ip_port,
net::Thread* thread, void* worker_specific_data,
net::NetMultiplexer* mpx);
virtual ~RsyncServerConn() override;
int DealMessage() override;
static void HandleMetaRsyncRequest(void* arg);
static void HandleFileRsyncRequest(void* arg);
private:
void* data_ = nullptr;
};

class RsyncServerThread : public net::HolyThread {
public:
RsyncServerThread(const std::set<std::string>& ips, int port, int cron_internal, RsyncServer* arg);
~RsyncServerThread();

private:
class RsyncServerConnFactory : public net::ConnFactory {
public:
explicit RsyncServerConnFactory(RsyncServer* sched) : scheduler_(sched) {}

std::shared_ptr<net::NetConn> NewNetConn(int connfd, const std::string& ip_port,
net::Thread* thread, void* worker_specific_data,
net::NetMultiplexer* net) const override {
return std::static_pointer_cast<net::NetConn>(
std::make_shared<RsyncServerConn>(connfd, ip_port, thread, scheduler_, net));
}
private:
RsyncServer* scheduler_ = nullptr;
};
class RsyncServerHandle : public net::ServerHandle {
public:
void FdClosedHandle(int fd, const std::string& ip_port) const override;
void FdTimeoutHandle(int fd, const std::string& ip_port) const override;
bool AccessHandle(int fd, std::string& ip) const override;
void CronHandle() const override;
};
private:
RsyncServerConnFactory conn_factory_;
RsyncServerHandle handle_;
};

} //end namespace rsync
#endif

Loading

0 comments on commit 5c3e603

Please sign in to comment.