Skip to content

Commit

Permalink
Feature/txn (#1585) (#2124)
Browse files Browse the repository at this point in the history
* Feature/txn (#1585)

* fix: fix select cmd return inconsistent with redis

Signed-off-by: Hao Lee <1838249551@qq.com>

* refactor:modified lock style while involve db level

Signed-off-by: Hao Lee <1838249551@qq.com>

* feature:txn basic

Signed-off-by: Hao Lee <1838249551@qq.com>

* fix:merge upstream

Signed-off-by: Hao Lee <1838249551@qq.com>

* feature:txn udpate

Signed-off-by: Hao Lee <1838249551@qq.com>

* feature:add txn for pika(#1446)

todo:test txn. Just to verify the feasibility of the program.
Signed-off-by: Hao Lee <1838249551@qq.com>

* update unwatch cmd

Add comments

Signed-off-by: Hao Lee <1838249551@qq.com>

* clear watched key when connection closed

Signed-off-by: Hao Lee <1838249551@qq.com>

* merge upstream code

Signed-off-by: Hao Lee <1838249551@qq.com>

* update

Signed-off-by: Hao Lee <1838249551@qq.com>

* feature: add txn for pika completely

Signed-off-by: Hao Lee <1838249551@qq.com>

* add set txn failed for modified watch key

Signed-off-by: Hao Lee <1838249551@qq.com>

* update:reduce the particle size of the lock in txn

Signed-off-by: Hao Lee <1838249551@qq.com>

* chore:remove redundant comment

Signed-off-by: Hao Lee <1838249551@qq.com>

* test:add go ci test for txn

Signed-off-by: Hao Lee <1838249551@qq.com>

* fix compile error for linux

Signed-off-by: Hao Lee <1838249551@qq.com>

* update txn go ci test

Signed-off-by: Hao Lee <1838249551@qq.com>

* update txn for block list pop command

Signed-off-by: Hao Lee <1838249551@qq.com>

* Improve blpop-related in Redis transactions

Signed-off-by: Hao Lee <1838249551@qq.com>

* blpop_txn_fix

* add some test for go test txn

Signed-off-by: Hao Lee <1838249551@qq.com>

* update txn integration test

Signed-off-by: Hao Lee <1838249551@qq.com>

* txn change class to struct

Signed-off-by: Hao Lee <1838249551@qq.com>

* txn:use weak ptr instead of shared ptr in Cmd

Signed-off-by: Hao Lee <1838249551@qq.com>

---------

Signed-off-by: Hao Lee <1838249551@qq.com>
Co-authored-by: cheniujh <1271435567@qq.com>

* FNT

* fix:txn compile error in ubuntu (#2128)

Signed-off-by: LeeHao <1838249551@qq.com>

* using func instead of class private member (#2130)

* using func instead of class private member

---------

Signed-off-by: Hao Lee <1838249551@qq.com>
Signed-off-by: LeeHao <1838249551@qq.com>
Co-authored-by: LeeHao <39085999+ForestLH@users.noreply.github.com>
Co-authored-by: cheniujh <1271435567@qq.com>
Co-authored-by: Xin.Zh <dragoncharlie@foxmail.com>
  • Loading branch information
4 people authored Nov 21, 2023
1 parent 4c140cb commit f1965de
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 17 deletions.
2 changes: 1 addition & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
protected:
CmdRes res_;
PikaCmdArgsType argv_;
std::string db_name_{};
std::string db_name_;

std::weak_ptr<net::NetConn> conn_;
std::weak_ptr<std::string> resp_;
Expand Down
5 changes: 2 additions & 3 deletions include/pika_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class MultiCmd : public Cmd {
Cmd* Clone() override { return new MultiCmd(*this); }
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {}
void Merge() override {}

private:
void DoInitial() override;

};

class ExecCmd : public Cmd {
Expand Down Expand Up @@ -56,6 +56,7 @@ class ExecCmd : public Cmd {
std::vector<CmdInfo> list_cmd_;
std::vector<std::string> keys_;
};

class DiscardCmd : public Cmd {
public:
DiscardCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
Expand All @@ -67,8 +68,6 @@ class DiscardCmd : public Cmd {
void DoInitial() override;
};



class WatchCmd : public Cmd {
public:
WatchCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
Expand Down
1 change: 1 addition & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ void FlushallCmd::FlushAllWithoutLock() {
res_.SetRes(CmdRes::kOk);
}
}

void FlushallCmd::DoWithoutLock(std::shared_ptr<Slot> slot) {
if (!slot) {
LOG(INFO) << "Flushall, but Slot not found";
Expand Down
6 changes: 5 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ void PikaClientConn::TryWriteResp() {
NotifyEpoll(true);
}
}

void PikaClientConn::PushCmdToQue(std::shared_ptr<Cmd> cmd) {
txn_cmd_que_.push(cmd);
}
Expand All @@ -287,6 +288,7 @@ bool PikaClientConn::IsTxnFailed() {
std::lock_guard<std::mutex> lg(txn_state_mu_);
return txn_state_[TxnStateBitMask::WatchFailed] | txn_state_[TxnStateBitMask::InitCmdFailed];
}

bool PikaClientConn::IsTxnInitFailed() {
std::lock_guard<std::mutex> lg(txn_state_mu_);
return txn_state_[TxnStateBitMask::InitCmdFailed];
Expand All @@ -306,6 +308,7 @@ void PikaClientConn::SetTxnWatchFailState(bool is_failed) {
std::lock_guard<std::mutex> lg(txn_state_mu_);
txn_state_[TxnStateBitMask::WatchFailed] = is_failed;
}

void PikaClientConn::SetTxnInitFailState(bool is_failed) {
std::lock_guard<std::mutex> lg(txn_state_mu_);
txn_state_[TxnStateBitMask::InitCmdFailed] = is_failed;
Expand Down Expand Up @@ -364,6 +367,7 @@ void PikaClientConn::SetAllTxnFailed() {
}
}
}

void PikaClientConn::SetTxnFailedFromDBs(std::string db_name) {
auto dispatcher = dynamic_cast<net::DispatchThread *>(server_thread());
if (dispatcher != nullptr) {
Expand All @@ -375,6 +379,7 @@ void PikaClientConn::SetTxnFailedFromDBs(std::string db_name) {
}
}
}

void PikaClientConn::ExitTxn() {
if (IsInTxn()) {
RemoveWatchedKeys();
Expand All @@ -384,7 +389,6 @@ void PikaClientConn::ExitTxn() {
}
}


void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr) {
// get opt
std::string opt = argv[0];
Expand Down
3 changes: 2 additions & 1 deletion src/pika_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ bool Slot::FlushSubDB(const std::string& db_name) {
std::lock_guard rwl(db_rwlock_);
return FlushSubDBWithoutLock(db_name);
}

bool Slot::FlushSubDBWithoutLock(const std::string& db_name) {
std::lock_guard l(bgsave_protector_);
if (bgsave_info_.bgsaving) {
Expand All @@ -510,6 +511,7 @@ bool Slot::FlushSubDBWithoutLock(const std::string& db_name) {
g_pika_server->PurgeDir(del_dbpath);
return true;
}

void Slot::InitKeyScan() {
key_scan_info_.start_time = time(nullptr);
char s_time[32];
Expand Down Expand Up @@ -542,4 +544,3 @@ Status Slot::GetKeyNum(std::vector<storage::KeyInfo>* key_info) {
key_scan_info_.duration = static_cast<int32_t>(time(nullptr) - key_scan_info_.start_time);
return Status::OK();
}

5 changes: 3 additions & 2 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#include "include/pika_transaction.h"
#include <memory>

#include "include/pika_transaction.h"
#include "include/pika_admin.h"
#include "include/pika_client_conn.h"
#include "include/pika_define.h"
#include "include/pika_list.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "include/pika_transaction.h"

extern std::unique_ptr<PikaServer> g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
Expand All @@ -37,7 +39,6 @@ void MultiCmd::DoInitial() {
}
}


void ExecCmd::Do(std::shared_ptr<Slot> slot) {
auto conn = GetConn();
auto client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
Expand Down
18 changes: 9 additions & 9 deletions tests/integration/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var _ = Describe("Text Txn", func() {
})
Describe("test watch", func() {
It("basic watch", func() {
txnClient.Watch(ctx, func(tx *redis.Tx) error { // 这个func相当于就是被一对watch和unwatch所包含了
txnClient.Watch(ctx, func(tx *redis.Tx) error { // including in a pair of watch and unwatch
pipe := tx.TxPipeline()
cmdClient.Set(ctx, "key", "1", 0)
pipe.Set(ctx, "key", "2", 0)
Expand Down Expand Up @@ -61,7 +61,7 @@ var _ = Describe("Text Txn", func() {
return nil
}, "key")
})
// 在事务中有另一个事务来使用flushdb清除db1的数据,不会影响到watch的这个db的key的事务执行
// Having another transaction in the transaction to clear the data in db1 using flushdb will not affect the transaction execution of the key in this db of watch
It("test watch1", func() {
watchKey := "key"
watchkeyValue := "value"
Expand All @@ -81,7 +81,7 @@ var _ = Describe("Text Txn", func() {
return nil
}, watchKey)
})
// 测试watch的key有多个类型
// multiple types of keys for testing watch
It("test watch multi type key", func() {
watchKey := "key"
watchkeyValue := "value"
Expand All @@ -94,7 +94,7 @@ var _ = Describe("Text Txn", func() {
}, watchKey)
Expect(err).To(HaveOccurred())
})
//// 测试flushall命令会使watch的key失败
// Testing the flushall command will cause watch's key to fail
It("txn failed cause of flushall", func() {
watchKey := "key"
watchkeyValue := "value"
Expand All @@ -114,7 +114,7 @@ var _ = Describe("Text Txn", func() {
return nil
}, watchKey)
})
// 测试select命令
// test 'select' command
It("select in txn", func() {
watchKey := "key"
noExist := "noExist"
Expand All @@ -126,20 +126,20 @@ var _ = Describe("Text Txn", func() {
Expect(intCmd.Err()).NotTo(HaveOccurred())

err := txnClient.Watch(ctx, func(tx *redis.Tx) error {
tx.Select(ctx, 1) // 这个是和txnClient.Watch使用的一个端口
tx.Select(ctx, 1) // this command used the same port with txnClient.Watch
tx.Watch(ctx, watchKey)
cmdClient.Set(ctx, watchKey, watchkeyValue, 0)
pipeline := tx.TxPipeline()
pipeline.Set(ctx, watchKey, modifiedValue, 0)
pipeline.Get(ctx, watchKey)
cmders, _ := pipeline.Exec(ctx) // 这个也是和txnClient.Watch使用的一个端口
cmders, _ := pipeline.Exec(ctx) // using the same port with txnClient.Watch
AssertEqualRedisString(modifiedValue, cmders[1])
return nil
}, noExist)
Expect(err).NotTo(HaveOccurred())
})

// 测试执行在事务中执行命令时不阻塞其他普通命令的执行
// The test execution does not block the execution of other ordinary commands when executing commands in transactions
It("test txn no block other cmd", func() {
pipe := txnClient.TxPipeline()
pipe.Get(ctx, "key")
Expand Down Expand Up @@ -183,7 +183,7 @@ var _ = Describe("Text Txn", func() {
})
})
Describe("Test Unwatch", func() {
// 测试unwatch的基本功能,unwatch之后事务应该不受影响
// test unwatch
It("unwatch1", func() {
watchKey := "key"
watchkeyValue := "value"
Expand Down

0 comments on commit f1965de

Please sign in to comment.