Skip to content

Commit

Permalink
if client current attachment table is only one partition, it can exec…
Browse files Browse the repository at this point in the history
… command which hold the kCmdFlagsMultiPartition mask (#459)
  • Loading branch information
Axlgrep committed May 15, 2019
1 parent 404598a commit 524d2b3
Show file tree
Hide file tree
Showing 22 changed files with 98 additions and 68 deletions.
5 changes: 4 additions & 1 deletion include/pika_bit.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

#ifndef PIKA_BIT_H_
#define PIKA_BIT_H_
#include "include/pika_command.h"

#include "blackwidow/blackwidow.h"

#include "include/pika_command.h"
#include "include/pika_partition.h"


/*
* bitoperation
Expand Down
5 changes: 4 additions & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,10 @@ class Cmd {
virtual ~Cmd() {}

virtual std::string current_key() const;
virtual void Process();
virtual void Execute();
virtual void ProcessSinglePartitionCmd();
virtual void ProcessMultiPartitionCmd();
virtual void ProcessDoNotSpecifyPartitionCmd();
virtual void Do(std::shared_ptr<Partition> partition = nullptr) = 0;

void Initial(const PikaCmdArgsType& argv,
Expand Down
1 change: 1 addition & 0 deletions include/pika_geo.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef PIKA_GEO_H_
#define PIKA_GEO_H_
#include "include/pika_command.h"
#include "include/pika_partition.h"

/*
* zset
Expand Down
1 change: 1 addition & 0 deletions include/pika_hyperloglog.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define PIKA_HYPERLOGLOG_H_

#include "include/pika_command.h"
#include "include/pika_partition.h"

/*
* hyperloglog
Expand Down
5 changes: 4 additions & 1 deletion include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

#ifndef PIKA_KV_H_
#define PIKA_KV_H_
#include "include/pika_command.h"

#include "blackwidow/blackwidow.h"

#include "include/pika_command.h"
#include "include/pika_partition.h"


/*
* kv
Expand Down
5 changes: 4 additions & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

#ifndef PIKA_LIST_H_
#define PIKA_LIST_H_
#include "include/pika_command.h"

#include "blackwidow/blackwidow.h"

#include "include/pika_command.h"
#include "include/pika_partition.h"


/*
* list
Expand Down
1 change: 1 addition & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class PikaServer {
bool IsTableExist(const std::string& table_name);
bool IsTableSupportCommand(const std::string& table_name,
const std::string& command);
uint32_t GetPartitionNumByTable(const std::string& table_name);
std::shared_ptr<Partition> GetTablePartitionById(
const std::string& table_name,
uint32_t partition_id);
Expand Down
4 changes: 4 additions & 0 deletions include/pika_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

#ifndef PIKA_SET_H_
#define PIKA_SET_H_

#include "include/pika_command.h"
#include "include/pika_partition.h"



/*
* set
Expand Down
1 change: 1 addition & 0 deletions include/pika_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Table {

bool IsCommandSupport(const std::string& cmd) const;
bool IsBinlogIoError() ;
uint32_t PartitionNum();

std::shared_ptr<Partition> GetPartitionById(uint32_t partition_id);
std::shared_ptr<Partition> GetPartitionByKey(const std::string& key);
Expand Down
4 changes: 3 additions & 1 deletion include/pika_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

#ifndef PIKA_ZSET_H_
#define PIKA_ZSET_H_
#include "include/pika_command.h"
#include "blackwidow/blackwidow.h"

#include "include/pika_command.h"
#include "include/pika_partition.h"


/*
* zset
Expand Down
2 changes: 1 addition & 1 deletion src/pika_binlog_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void BinlogBGWorker::DoBinlogBG(void* arg) {
}

if (!error_happend) {
c_ptr->Do();
c_ptr->Execute();
}

if (!c_ptr->is_suspend()) {
Expand Down
6 changes: 2 additions & 4 deletions src/pika_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@

#include <limits>
#include "slash/include/slash_string.h"
#include "include/pika_bit.h"
#include "include/pika_server.h"

extern PikaServer *g_pika_server;
#include "include/pika_bit.h"

void BitSetCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand Down Expand Up @@ -213,7 +211,7 @@ void BitOpCmd::DoInitial() {

void BitOpCmd::Do(std::shared_ptr<Partition> partition) {
int64_t result_length;
rocksdb::Status s = g_pika_server->db()->BitOp(op_, dest_key_, src_keys_, &result_length);
rocksdb::Status s = partition->db()->BitOp(op_, dest_key_, src_keys_, &result_length);
if (s.ok()) {
res_.AppendInteger(result_length);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ std::string PikaClientConn::DoCmd(const PikaCmdArgsType& argv,
}

// Process Command
c_ptr->Process();
c_ptr->Execute();

if (g_pika_conf->slowlog_slower_than() >= 0) {
int32_t start_time = start_us / 1000000;
Expand Down
39 changes: 28 additions & 11 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,20 +472,37 @@ std::string Cmd::current_key() const {
return "";
}

void Cmd::Process() {
if (is_single_partition()) {
std::shared_ptr<Partition> partition =
g_pika_server->GetTablePartitionByKey(table_name_, current_key());
if (!partition) {
res_.SetRes(CmdRes::kErrOther, "Partition not found");
} else {
partition->DoCommand(this);
}
void Cmd::Execute() {
if (is_single_partition()
|| g_pika_server->GetPartitionNumByTable(table_name_) == 1) {
ProcessSinglePartitionCmd();
} else if (is_multi_partition()) {
Do();
ProcessMultiPartitionCmd();
} else {
ProcessDoNotSpecifyPartitionCmd();
}
}

void Cmd::ProcessSinglePartitionCmd() {
std::shared_ptr<Partition> partition;
if (is_single_partition()) {
partition = g_pika_server->GetTablePartitionByKey(table_name_, current_key());
} else {
Do();
partition = g_pika_server->GetTablePartitionById(table_name_, 0);
}

if (!partition) {
res_.SetRes(CmdRes::kErrOther, "Partition not found");
return;
}
partition->DoCommand(this);
}

void Cmd::ProcessMultiPartitionCmd() {
}

void Cmd::ProcessDoNotSpecifyPartitionCmd() {
Do();
}


Expand Down
15 changes: 6 additions & 9 deletions src/pika_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@

#include "slash/include/slash_string.h"
#include "include/pika_geo.h"
#include "include/pika_server.h"
#include "include/pika_geohash_helper.h"

extern PikaServer *g_pika_server;

void GeoAddCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameGeoAdd);
Expand Down Expand Up @@ -252,7 +249,7 @@ static bool sort_distance_desc(const NeighborPoint & pos1, const NeighborPoint &
return pos1.distance > pos2.distance;
}

static void GetAllNeighbors(std::string & key, GeoRange & range, CmdRes & res) {
static void GetAllNeighbors(std::shared_ptr<Partition> partition, std::string & key, GeoRange & range, CmdRes & res) {
rocksdb::Status s;
double longitude = range.longitude, latitude = range.latitude, distance = range.distance;
int count_limit = 0;
Expand Down Expand Up @@ -298,7 +295,7 @@ static void GetAllNeighbors(std::string & key, GeoRange & range, CmdRes & res) {
continue;
}
std::vector<blackwidow::ScoreMember> score_members;
s = g_pika_server->db()->ZRangebyscore(key, (double)min, (double)max, true, true, &score_members);
s = partition->db()->ZRangebyscore(key, (double)min, (double)max, true, true, &score_members);
if (!s.ok() && !s.IsNotFound()) {
res.SetRes(CmdRes::kErrOther, s.ToString());
return;
Expand Down Expand Up @@ -341,7 +338,7 @@ static void GetAllNeighbors(std::string & key, GeoRange & range, CmdRes & res) {
score_members.push_back({score, result[i].member});
}
int32_t count = 0;
s = g_pika_server->db()->ZAdd(range.storekey, score_members, &count);
s = partition->db()->ZAdd(range.storekey, score_members, &count);
if (!s.ok()) {
res.SetRes(CmdRes::kErrOther, s.ToString());
return;
Expand Down Expand Up @@ -468,7 +465,7 @@ void GeoRadiusCmd::DoInitial() {
}

void GeoRadiusCmd::Do(std::shared_ptr<Partition> partition) {
GetAllNeighbors(key_, range_, this->res_);
GetAllNeighbors(partition, key_, range_, this->res_);
}

void GeoRadiusByMemberCmd::DoInitial() {
Expand Down Expand Up @@ -541,13 +538,13 @@ void GeoRadiusByMemberCmd::DoInitial() {

void GeoRadiusByMemberCmd::Do(std::shared_ptr<Partition> partition) {
double score;
rocksdb::Status s = g_pika_server->db()->ZScore(key_, range_.member, &score);
rocksdb::Status s = partition->db()->ZScore(key_, range_.member, &score);
if (s.ok()) {
double xy[2];
GeoHashBits hash = { .bits = (uint64_t)score, .step = GEO_STEP_MAX };
geohashDecodeToLongLatWGS84(hash, xy);
range_.longitude = xy[0];
range_.latitude = xy[1];
}
GetAllNeighbors(key_, range_, this->res_);
GetAllNeighbors(partition, key_, range_, this->res_);
}
8 changes: 3 additions & 5 deletions src/pika_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@

#include <vector>
#include "slash/include/slash_string.h"
#include "include/pika_server.h"
#include "include/pika_hyperloglog.h"

extern PikaServer *g_pika_server;
#include "include/pika_hyperloglog.h"

void PfAddCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand Down Expand Up @@ -49,7 +47,7 @@ void PfCountCmd::DoInitial() {

void PfCountCmd::Do(std::shared_ptr<Partition> partition) {
int64_t value_ = 0;
rocksdb::Status s = g_pika_server->db()->PfCount(keys_, &value_);
rocksdb::Status s = partition->db()->PfCount(keys_, &value_);
if (s.ok()) {
res_.AppendInteger(value_);
} else {
Expand All @@ -69,7 +67,7 @@ void PfMergeCmd::DoInitial() {
}

void PfMergeCmd::Do(std::shared_ptr<Partition> partition) {
rocksdb::Status s = g_pika_server->db()->PfMerge(keys_);
rocksdb::Status s = partition->db()->PfMerge(keys_);
if (s.ok()) {
res_.SetRes(CmdRes::kOk);
} else {
Expand Down
23 changes: 10 additions & 13 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

#include "slash/include/slash_string.h"
#include "include/pika_kv.h"
#include "include/pika_server.h"

extern PikaServer *g_pika_server;

/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void SetCmd::DoInitial() {
Expand Down Expand Up @@ -174,7 +171,7 @@ void DelCmd::DoInitial() {

void DelCmd::Do(std::shared_ptr<Partition> partition) {
std::map<blackwidow::DataType, blackwidow::Status> type_status;
int64_t count = g_pika_server->db()->Del(keys_, &type_status);
int64_t count = partition->db()->Del(keys_, &type_status);
if (count >= 0) {
res_.AppendInteger(count);
} else {
Expand Down Expand Up @@ -530,7 +527,7 @@ void MgetCmd::DoInitial() {

void MgetCmd::Do(std::shared_ptr<Partition> partition) {
std::vector<blackwidow::ValueStatus> vss;
rocksdb::Status s = g_pika_server->db()->MGet(keys_, &vss);
rocksdb::Status s = partition->db()->MGet(keys_, &vss);
if (s.ok()) {
res_.AppendArrayLen(vss.size());
for (const auto& vs : vss) {
Expand Down Expand Up @@ -572,7 +569,7 @@ void KeysCmd::DoInitial() {

void KeysCmd::Do(std::shared_ptr<Partition> partition) {
std::vector<std::string> keys;
rocksdb::Status s = g_pika_server->db()->Keys(type_, pattern_, &keys);
rocksdb::Status s = partition->db()->Keys(type_, pattern_, &keys);
res_.AppendArrayLen(keys.size());
for (const auto& key : keys) {
res_.AppendString(key);
Expand Down Expand Up @@ -796,7 +793,7 @@ void MsetCmd::DoInitial() {
}

void MsetCmd::Do(std::shared_ptr<Partition> partition) {
blackwidow::Status s = g_pika_server->db()->MSet(kvs_);
blackwidow::Status s = partition->db()->MSet(kvs_);
if (s.ok()) {
res_.SetRes(CmdRes::kOk);
} else {
Expand All @@ -823,7 +820,7 @@ void MsetnxCmd::DoInitial() {

void MsetnxCmd::Do(std::shared_ptr<Partition> partition) {
success_ = 0;
rocksdb::Status s = g_pika_server->db()->MSetnx(kvs_, &success_);
rocksdb::Status s = partition->db()->MSetnx(kvs_, &success_);
if (s.ok()) {
res_.AppendInteger(success_);
} else {
Expand Down Expand Up @@ -916,7 +913,7 @@ void ExistsCmd::DoInitial() {

void ExistsCmd::Do(std::shared_ptr<Partition> partition) {
std::map<blackwidow::DataType, rocksdb::Status> type_status;
int64_t res = g_pika_server->db()->Exists(keys_, &type_status);
int64_t res = partition->db()->Exists(keys_, &type_status);
if (res != -1) {
res_.AppendInteger(res);
} else {
Expand Down Expand Up @@ -1296,7 +1293,7 @@ void ScanCmd::DoInitial() {

void ScanCmd::Do(std::shared_ptr<Partition> partition) {
std::vector<std::string> keys;
int64_t cursor_ret = g_pika_server->db()->Scan(cursor_, pattern_, count_, &keys);
int64_t cursor_ret = partition->db()->Scan(cursor_, pattern_, count_, &keys);

res_.AppendArrayLen(2);

Expand Down Expand Up @@ -1363,7 +1360,7 @@ void ScanxCmd::DoInitial() {
void ScanxCmd::Do(std::shared_ptr<Partition> partition) {
std::string next_key;
std::vector<std::string> keys;
rocksdb::Status s = g_pika_server->db()->Scanx(type_, start_key_, pattern_, count_, &keys, &next_key);
rocksdb::Status s = partition->db()->Scanx(type_, start_key_, pattern_, count_, &keys, &next_key);

if (s.ok()) {
res_.AppendArrayLen(2);
Expand Down Expand Up @@ -1460,7 +1457,7 @@ void PKScanRangeCmd::Do(std::shared_ptr<Partition> partition) {
std::string next_key;
std::vector<std::string> keys;
std::vector<blackwidow::KeyValue> kvs;
rocksdb::Status s = g_pika_server->db()->PKScanRange(type_, key_start_, key_end_, pattern_, limit_, &keys, &kvs, &next_key);
rocksdb::Status s = partition->db()->PKScanRange(type_, key_start_, key_end_, pattern_, limit_, &keys, &kvs, &next_key);

if (s.ok()) {
res_.AppendArrayLen(2);
Expand Down Expand Up @@ -1541,7 +1538,7 @@ void PKRScanRangeCmd::Do(std::shared_ptr<Partition> partition) {
std::string next_key;
std::vector<std::string> keys;
std::vector<blackwidow::KeyValue> kvs;
rocksdb::Status s = g_pika_server->db()->PKRScanRange(type_, key_start_, key_end_, pattern_, limit_, &keys, &kvs, &next_key);
rocksdb::Status s = partition->db()->PKRScanRange(type_, key_start_, key_end_, pattern_, limit_, &keys, &kvs, &next_key);

if (s.ok()) {
res_.AppendArrayLen(2);
Expand Down
Loading

0 comments on commit 524d2b3

Please sign in to comment.