Skip to content

Commit

Permalink
revert_delete_sharding_mode (OpenAtomFoundation#1509)
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored May 17, 2023
1 parent acb8fa9 commit 99bd291
Show file tree
Hide file tree
Showing 16 changed files with 152 additions and 55 deletions.
15 changes: 0 additions & 15 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -745,19 +745,4 @@ target_link_libraries(${PROJECT_NAME}
${LIBUNWIND_LIBRARY}
${JEMALLOC_LIBRARY})

option(ENABLE_IPO "enable interprocedural optimization" ON)
if (ENABLE_IPO)
include(CheckIPOSupported)
check_ipo_supported(RESULT ipo_result OUTPUT ipo_output LANGUAGES CXX)

if (ipo_result)
set_property(TARGET ${PROJECT_NAME} PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE)
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
target_link_libraries(${PROJECT_NAME} PUBLIC "-fuse-ld=lld")
endif ()
else ()
message(WARNING "IPO is not supported: ${ipo_output}")
endif ()
endif ()

option(USE_SSL "Enable SSL support" OFF)
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool is_admin_require() const;
bool is_single_partition() const;
bool is_multi_partition() const;
bool is_classic_mode() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };

Expand Down
1 change: 1 addition & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return user_blacklist_;
}
bool classic_mode() { return classic_mode_.load(); }
int databases() {
std::shared_lock l(rwlock_);
return databases_;
Expand Down
27 changes: 20 additions & 7 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ void DbSlaveofCmd::DoInitial() {
res_.SetRes(CmdRes::kWrongNum, kCmdNameDbSlaveof);
return;
}
if (!g_pika_conf->classic_mode()) {
res_.SetRes(CmdRes::kErrOther, "DbSlaveof only support on classic mode");
return;
}
if (g_pika_server->role() ^ PIKA_ROLE_SLAVE || !g_pika_server->MetaSyncDone()) {
res_.SetRes(CmdRes::kErrOther, "Not currently a slave");
return;
Expand Down Expand Up @@ -414,9 +418,11 @@ void SelectCmd::DoInitial() {
res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect);
return;
}
if (index < 0 || index >= g_pika_conf->databases()) {
res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect + " DB index is out of range");
return;
if (g_pika_conf->classic_mode()) {
if (index < 0 || index >= g_pika_conf->databases()) {
res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect + " DB index is out of range");
return;
}
}
table_name_ = "db" + argv_[1];
if (!g_pika_server->IsTableExist(table_name_)) {
Expand Down Expand Up @@ -922,6 +928,11 @@ void InfoCmd::InfoShardingReplication(std::string& info) {
}

void InfoCmd::InfoReplication(std::string& info) {
if (!g_pika_conf->classic_mode()) {
// In Sharding mode, show different replication info
InfoShardingReplication(info);
return;
}

int host_role = g_pika_server->role();
std::stringstream tmp_stream;
Expand Down Expand Up @@ -1347,16 +1358,16 @@ void ConfigCmd::ConfigGet(std::string& ret) {
if (pstd::stringmatch(pattern.data(), "instance-mode", 1)) {
elements += 2;
EncodeString(&config_body, "instance-mode");
EncodeString(&config_body, "classic");
EncodeString(&config_body, (g_pika_conf->classic_mode() ? "classic" : "sharding"));
}

if (pstd::stringmatch(pattern.data(), "databases", 1)) {
if (g_pika_conf->classic_mode() && pstd::stringmatch(pattern.data(), "databases", 1)) {
elements += 2;
EncodeString(&config_body, "databases");
EncodeInt32(&config_body, g_pika_conf->databases());
}

if (pstd::stringmatch(pattern.data(), "default-slot-num", 1)) {
if (!g_pika_conf->classic_mode() && pstd::stringmatch(pattern.data(), "default-slot-num", 1)) {
elements += 2;
EncodeString(&config_body, "default-slot-num");
EncodeInt32(&config_body, g_pika_conf->default_slot_num());
Expand Down Expand Up @@ -2329,7 +2340,9 @@ void HelloCmd::Do(std::shared_ptr<Partition> partition) {
};
// just for redis resp2 protocol
fvs.push_back({"proto", "2"});
fvs.push_back({"mode", "classic"});
if (g_pika_conf->classic_mode()) {
fvs.push_back({"mode", "classic"});
}
int host_role = g_pika_server->role();
switch (host_role) {
case PIKA_ROLE_SINGLE:
Expand Down
10 changes: 7 additions & 3 deletions src/pika_auxiliary_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ PikaAuxiliaryThread::~PikaAuxiliaryThread() {

void* PikaAuxiliaryThread::ThreadMain() {
while (!should_stop()) {
if (g_pika_server->ShouldMetaSync()) {
g_pika_rm->SendMetaSyncRequest();
} else if (g_pika_server->MetaSyncDone()) {
if (g_pika_conf->classic_mode()) {
if (g_pika_server->ShouldMetaSync()) {
g_pika_rm->SendMetaSyncRequest();
} else if (g_pika_server->MetaSyncDone()) {
g_pika_rm->RunSyncSlavePartitionStateMachine();
}
} else {
g_pika_rm->RunSyncSlavePartitionStateMachine();
}

Expand Down
2 changes: 1 addition & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void PikaClientConn::ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t start_

void PikaClientConn::ProcessMonitor(const PikaCmdArgsType& argv) {
std::string monitor_message;
std::string table_name = current_table_.substr(2);
std::string table_name = g_pika_conf->classic_mode() ? current_table_.substr(2) : current_table_;
monitor_message = std::to_string(1.0 * pstd::NowMicros() / 1000000) + " [" + table_name + " " + this->ip_port() + "]";
for (PikaCmdArgsType::const_iterator iter = argv.begin(); iter != argv.end(); iter++) {
monitor_message += " " + pstd::ToRead(*iter);
Expand Down
6 changes: 5 additions & 1 deletion src/pika_cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ bool PikaCmdTableManager::CheckCurrentThreadDistributionMapExist(const std::thre
void PikaCmdTableManager::InsertCurrentThreadDistributionMap() {
auto tid = std::this_thread::get_id();
PikaDataDistribution* distribution = nullptr;
distribution = new HashModulo();
if (g_pika_conf->classic_mode()) {
distribution = new HashModulo();
} else {
distribution = new Crc32();
}
distribution->Init();
std::lock_guard l(map_protector_);
thread_distribution_map_.emplace(tid, distribution);
Expand Down
43 changes: 35 additions & 8 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ void Cmd::Execute() {
ProcessFlushAllCmd();
} else if (name_ == kCmdNameInfo || name_ == kCmdNameConfig) {
ProcessDoNotSpecifyPartitionCmd();
} else if (is_single_partition()) {
} else if (is_single_partition() || g_pika_conf->classic_mode()) {
ProcessSinglePartitionCmd();
} else if (is_multi_partition()) {
ProcessMultiPartitionCmd();
Expand Down Expand Up @@ -578,8 +578,18 @@ void Cmd::ProcessFlushAllCmd() {

void Cmd::ProcessSinglePartitionCmd() {
std::shared_ptr<Partition> partition;
//a table has only one partition
partition = g_pika_server->GetPartitionByDbName(table_name_);
if (g_pika_conf->classic_mode()) {
// in classic mode a table has only one partition
partition = g_pika_server->GetPartitionByDbName(table_name_);
} else {
std::vector<std::string> cur_key = current_key();
if (cur_key.empty()) {
res_.SetRes(CmdRes::kErrOther, "Internal Error");
return;
}
// in sharding mode we select partition by key
partition = g_pika_server->GetTablePartitionByKey(table_name_, cur_key.front());
}

if (!partition) {
res_.SetRes(CmdRes::kErrOther, "Partition not found");
Expand Down Expand Up @@ -612,7 +622,11 @@ void Cmd::InternalProcessCommand(std::shared_ptr<Partition> partition,
std::shared_ptr<SyncMasterPartition> sync_partition, const HintKeys& hint_keys) {
pstd::lock::MultiRecordLock record_lock(partition->LockMgr());
if (is_write()) {
record_lock.Lock(current_key());
if (!hint_keys.empty() && is_multi_partition() && !g_pika_conf->classic_mode()) {
record_lock.Lock(hint_keys.keys);
} else {
record_lock.Lock(current_key());
}
}

uint64_t start_us = 0;
Expand All @@ -626,15 +640,24 @@ void Cmd::InternalProcessCommand(std::shared_ptr<Partition> partition,
DoBinlog(sync_partition);

if (is_write()) {
record_lock.Unlock(current_key());
if (!hint_keys.empty() && is_multi_partition() && !g_pika_conf->classic_mode()) {
record_lock.Unlock(hint_keys.keys);
} else {
record_lock.Unlock(current_key());
}
}
}

void Cmd::DoCommand(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {
if (!is_suspend()) {
partition->DbRWLockReader();
}
Do(partition);

if (!hint_keys.empty() && is_multi_partition() && !g_pika_conf->classic_mode()) {
Split(partition, hint_keys);
} else {
Do(partition);
}

if (!is_suspend()) {
partition->DbRWUnLock();
Expand Down Expand Up @@ -733,9 +756,13 @@ bool Cmd::is_admin_require() const { return ((flag_ & kCmdFlagsMaskAdminRequire)
bool Cmd::is_single_partition() const { return ((flag_ & kCmdFlagsMaskPartition) == kCmdFlagsSinglePartition); }
bool Cmd::is_multi_partition() const { return ((flag_ & kCmdFlagsMaskPartition) == kCmdFlagsMultiPartition); }

bool Cmd::is_classic_mode() const { return g_pika_conf->classic_mode(); }

bool Cmd::HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const {
if (GetHashkey(lhs) != GetHashkey(rhs)) {
return false;
if (is_classic_mode() == false) {
if (GetHashkey(lhs) != GetHashkey(rhs)) {
return false;
}
}
return true;
}
Expand Down
35 changes: 27 additions & 8 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,33 @@ int PikaConf::Load() {
sync_thread_num_ = 24;
}

GetConfInt("databases", &databases_);
if (databases_ < 1 || databases_ > 8) {
LOG(FATAL) << "config databases error, limit [1 ~ 8], the actual is: " << databases_;
}
for (int idx = 0; idx < databases_; ++idx) {
table_structs_.push_back({"db" + std::to_string(idx), 1, {0}});
std::string instance_mode;
GetConfStr("instance-mode", &instance_mode);
classic_mode_.store(instance_mode.empty() || !strcasecmp(instance_mode.data(), "classic"));

if (classic_mode_.load()) {
GetConfInt("databases", &databases_);
if (databases_ < 1 || databases_ > 8) {
LOG(FATAL) << "config databases error, limit [1 ~ 8], the actual is: " << databases_;
}
for (int idx = 0; idx < databases_; ++idx) {
table_structs_.push_back({"db" + std::to_string(idx), 1, {0}});
}
} else {
GetConfInt("default-slot-num", &default_slot_num_);
if (default_slot_num_ <= 0) {
LOG(FATAL) << "config default-slot-num error,"
<< " it should greater than zero, the actual is: " << default_slot_num_;
}
std::string pika_meta_path = db_path_ + kPikaMeta;
if (!pstd::FileExists(pika_meta_path)) {
local_meta_->StableSave({{"db0", static_cast<uint32_t>(default_slot_num_), {}}});
}
Status s = local_meta_->ParseMeta(&table_structs_);
if (!s.ok()) {
LOG(FATAL) << "parse meta file error";
}
}

default_table_ = table_structs_[0].table_name;

int tmp_replication_num = 0;
Expand All @@ -266,7 +285,7 @@ int PikaConf::Load() {
<< " [0..." << replication_num_.load() << "]";
}
consensus_level_.store(tmp_consensus_level);
if ((consensus_level_.load() != 0 || replication_num_.load() != 0)) {
if (classic_mode_.load() && (consensus_level_.load() != 0 || replication_num_.load() != 0)) {
LOG(FATAL) << "consensus-level & replication-num only configurable under sharding mode,"
<< " set it to be 0 if you are using classic mode";
}
Expand Down
3 changes: 2 additions & 1 deletion src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ int MemLog::InternalFindLogByBinlogOffset(const LogOffset& offset) {
ConsensusCoordinator::ConsensusCoordinator(const std::string& table_name, uint32_t partition_id)
: table_name_(table_name), partition_id_(partition_id) {
std::string table_log_path = g_pika_conf->log_path() + "log_" + table_name + "/";
std::string log_path = table_log_path;
std::string log_path =
g_pika_conf->classic_mode() ? table_log_path : table_log_path + std::to_string(partition_id) + "/";
context_ = std::make_shared<Context>(log_path + kContext);
stable_logger_ = std::make_shared<StableLog>(table_name, partition_id, log_path);
mem_logger_ = std::make_shared<MemLog>();
Expand Down
17 changes: 11 additions & 6 deletions src/pika_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,24 @@ std::string BgsaveSubPath(const std::string& table_name, uint32_t partition_id)
return std::string(buf);
}

std::string DbSyncPath(const std::string& sync_path, const std::string& table_name, const uint32_t partition_id) {
std::string DbSyncPath(const std::string& sync_path, const std::string& table_name, const uint32_t partition_id,
bool classic_mode) {
char buf[256];
std::string partition_id_str = std::to_string(partition_id);
snprintf(buf, sizeof(buf), "%s/", table_name.data());
if (classic_mode) {
snprintf(buf, sizeof(buf), "%s/", table_name.data());
} else {
snprintf(buf, sizeof(buf), "%s/%s/", table_name.data(), partition_id_str.data());
}
return sync_path + buf;
}

Partition::Partition(const std::string& table_name, uint32_t partition_id, const std::string& table_db_path)
: table_name_(table_name), partition_id_(partition_id), bgsave_engine_(nullptr) {
db_path_ = table_db_path;
bgsave_sub_path_ = table_name;
dbsync_path_ = DbSyncPath(g_pika_conf->db_sync_path(), table_name_, partition_id_);
partition_name_ = table_name ;
db_path_ = g_pika_conf->classic_mode() ? table_db_path : PartitionPath(table_db_path, partition_id_);
bgsave_sub_path_ = g_pika_conf->classic_mode() ? table_name : BgsaveSubPath(table_name_, partition_id_);
dbsync_path_ = DbSyncPath(g_pika_conf->db_sync_path(), table_name_, partition_id_, g_pika_conf->classic_mode());
partition_name_ = g_pika_conf->classic_mode() ? table_name : PartitionName(table_name_, partition_id_);

db_ = std::shared_ptr<storage::Storage>(new storage::Storage());
rocksdb::Status s = db_->Open(g_pika_server->storage_options(), db_path_);
Expand Down
4 changes: 2 additions & 2 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
const InnerMessage::InnerResponse::BinlogSync& binlog_res = res->binlog_sync((*index)[i]);
// if pika are not current a slave or partition not in
// BinlogSync state, we drop remain write binlog task
if ((!(g_pika_server->role() & PIKA_ROLE_SLAVE)) ||
if ((g_pika_conf->classic_mode() && !(g_pika_server->role() & PIKA_ROLE_SLAVE)) ||
((slave_partition->State() != ReplState::kConnected) && (slave_partition->State() != ReplState::kWaitDBSync))) {
return;
}
Expand Down Expand Up @@ -209,7 +209,7 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red
// Monitor related
std::string monitor_message;
if (g_pika_server->HasMonitorClients()) {
std::string table_name = worker->table_name_.substr(2);
std::string table_name = g_pika_conf->classic_mode() ? worker->table_name_.substr(2) : worker->table_name_;
std::string monitor_message =
std::to_string(1.0 * pstd::NowMicros() / 1000000) + " [" + table_name + " " + worker->ip_port_ + "]";
for (const auto& item : argv) {
Expand Down
8 changes: 8 additions & 0 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
}

const InnerMessage::InnerResponse_MetaSync meta_sync = response->meta_sync();
if (g_pika_conf->classic_mode() != meta_sync.classic_mode()) {
LOG(WARNING) << "Self in " << (g_pika_conf->classic_mode() ? "classic" : "sharding") << " mode, but master in "
<< (meta_sync.classic_mode() ? "classic" : "sharding")
<< " mode, failed to establish master-slave relationship";
g_pika_server->SyncError();
conn->NotifyClose();
return;
}

std::vector<TableStruct> master_table_structs;
for (int idx = 0; idx < meta_sync.tables_info_size(); ++idx) {
Expand Down
1 change: 1 addition & 0 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void PikaReplServerConn::HandleMetaSyncRequest(void* arg) {
g_pika_server->BecomeMaster();
response.set_code(InnerMessage::kOk);
InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync();
meta_sync->set_classic_mode(g_pika_conf->classic_mode());
for (const auto& table_struct : table_structs) {
InnerMessage::InnerResponse_MetaSync_TableInfo* table_info = meta_sync->add_tables_info();
table_info->set_table_name(table_struct.table_name);
Expand Down
6 changes: 5 additions & 1 deletion src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ SyncPartition::SyncPartition(const std::string& table_name, uint32_t partition_i
: partition_info_(table_name, partition_id) {}

std::string SyncPartition::PartitionName() {
return partition_info_.table_name_;
if (g_pika_conf->classic_mode()) {
return partition_info_.table_name_;
} else {
return partition_info_.ToString();
}
}

/* SyncMasterPartition*/
Expand Down
Loading

0 comments on commit 99bd291

Please sign in to comment.