Skip to content

Commit

Permalink
fix ci replication go test failed problem (#2435)
Browse files Browse the repository at this point in the history
Co-authored-by: wuxianrong <wuxianrong@360.cn>
  • Loading branch information
Mixficsol and wuxianrong authored Mar 2, 2024
1 parent 5d5bfe9 commit 60718b1
Show file tree
Hide file tree
Showing 17 changed files with 92 additions and 90 deletions.
2 changes: 1 addition & 1 deletion tests/integration/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Cache test", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/csanning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var _ = Describe("Csanning Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/geo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Geo Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var _ = Describe("Hash Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hyperloglog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Hyperloglog Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("List Commands", func() {
var blockedLock sync.Mutex

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down Expand Up @@ -916,8 +916,8 @@ var _ = Describe("List Commands", func() {
Expect(lRange.Err()).NotTo(HaveOccurred())
Expect(lRange.Val()).To(Equal([]string{"two", "three"}))

err := client.Do(ctx, "LPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command")))
err := client.Do(ctx, "LPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command")))
})

It("should LPopCount", func() {
Expand Down Expand Up @@ -1162,7 +1162,7 @@ var _ = Describe("List Commands", func() {
Expect(lRange.Val()).To(Equal([]string{"one", "two"}))

err := client.Do(ctx, "RPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command")))
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command")))
})

It("should RPopCount", func() {
Expand Down
26 changes: 11 additions & 15 deletions tests/integration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import (
"github.com/redis/go-redis/v9"
)

const (
LOCALHOST = "127.0.0.1"
SLAVEPORT = "9231"
MASTERPORT = "9241"
SINGLEADDR = "127.0.0.1:9221"
SLAVEADDR = "127.0.0.1:9231"
MASTERADDR = "127.0.0.1:9241"
)

type TimeValue struct {
time.Time
}
Expand All @@ -15,22 +24,9 @@ func (t *TimeValue) ScanRedis(s string) (err error) {
return
}

func pikaOptions1() *redis.Options {
return &redis.Options{
Addr: "127.0.0.1:9221",
DB: 0,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxRetries: -1,
PoolSize: 30,
PoolTimeout: 60 * time.Second,
}
}

func pikaOptions2() *redis.Options {
func PikaOption(addr string) *redis.Options {
return &redis.Options{
Addr: "127.0.0.1:9231",
Addr: addr,
DB: 0,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ var _ = Describe("PubSub", func() {
ctx := context.TODO()

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client2 = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
client2 = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(client2.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(2 * time.Second)
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ var _ = Describe("should replication ", func() {
var clientMaster *redis.Client

BeforeEach(func() {
clientMaster = redis.NewClient(pikaOptions1())
clientSlave = redis.NewClient(pikaOptions2())
clientMaster = redis.NewClient(PikaOption(MASTERADDR))
clientSlave = redis.NewClient(PikaOption(SLAVEADDR))
cleanEnv(ctx, clientMaster, clientSlave)
Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expand All @@ -395,11 +395,11 @@ var _ = Describe("should replication ", func() {
infoRes = clientMaster.Info(ctx, "replication")
Expect(infoRes.Err()).NotTo(HaveOccurred())
Expect(infoRes.Val()).To(ContainSubstring("role:master"))
Expect(clientSlave.Do(ctx, "slaveof", "127.0.0.1", "9231").Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same"))
Expect(clientSlave.Do(ctx, "slaveof", LOCALHOST, SLAVEPORT).Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same"))

var count = 0
for {
res := trySlave(ctx, clientSlave, "127.0.0.1", "9221")
res := trySlave(ctx, clientSlave, LOCALHOST, MASTERPORT)
if res {
break
} else if count > 4 {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Server", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Set Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down Expand Up @@ -276,8 +276,8 @@ var _ = Describe("Set Commands", func() {
Expect(sMembers.Err()).NotTo(HaveOccurred())
Expect(sMembers.Val()).To(HaveLen(3))

err := client.Do(ctx, "SPOP", "set", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command")))
err := client.Do(ctx, "SPOP", "set", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command")))
})

It("should SPopN", func() {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/slowlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var _ = Describe("Slowlog Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
9 changes: 8 additions & 1 deletion tests/integration/start_master_and_slave.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@
# This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing.
# it's used to start pika master and slave, running path: build
cp ../../output/pika ./pika
cp ../conf/pika.conf ./pika_single.conf
cp ../conf/pika.conf ./pika_master.conf
cp ../conf/pika.conf ./pika_slave.conf
# Create folders for storing data on the primary and secondary nodes
mkdir master_data
mkdir slave_data
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf
# Example Change the location for storing data on primary and secondary nodes in the configuration file
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_single.conf
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|port : 9221|port : 9241|' -e 's|log-path : ./log/|log-path : ./master_data/log/|' -e 's|db-path : ./db/|db-path : ./master_data/db/|' -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|port : 9221|port : 9231|' -e 's|log-path : ./log/|log-path : ./slave_data/log/|' -e 's|db-path : ./db/|db-path : ./slave_data/db/|' -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_slave.conf
# Start three nodes
./pika -c ./pika_single.conf
./pika -c ./pika_master.conf
./pika -c ./pika_slave.conf
#ensure both master and slave are ready
Expand Down
97 changes: 48 additions & 49 deletions tests/integration/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
package pika_integration

import (
"sync"
"context"
"sync/atomic"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"

. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
Expand Down Expand Up @@ -120,7 +120,7 @@ func parseStreamEntryID(id string) (ts int64, seqNum int64) {
var _ = Describe("Stream Commands", func() {
ctx := context.TODO()
var client *redis.Client
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
client.FlushDB(ctx)

BeforeEach(func() {
Expand All @@ -140,20 +140,20 @@ var _ = Describe("Stream Commands", func() {
const numWriters = 10
const numReaders = 10
const messagesPerWriter = 20

createClient := func() *redis.Client {
return redis.NewClient(pikaOptions1())
return redis.NewClient(PikaOption(SINGLEADDR))
}

var messageCount int32

// Start writer goroutines
for i := 0; i < numWriters; i++ {
go func(writerIndex int) {
defer GinkgoRecover()
writerClient := createClient()
defer writerClient.Close()

for j := 0; j < messagesPerWriter; j++ {
_, err := writerClient.XAdd(ctx, &redis.XAddArgs{
Stream: streamKey,
Expand All @@ -164,41 +164,42 @@ var _ = Describe("Stream Commands", func() {
}
}(i)
}

// Start reader goroutines
var wg sync.WaitGroup
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func() {
defer GinkgoRecover()
defer wg.Done()
readerClient := createClient()
defer readerClient.Close()

lastID := "0"
readMessages := 0
for readMessages < totalMessages {
items, err := readerClient.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamKey, lastID},
Block: 0,
}).Result()
if (err != nil) {
continue
}

// Check if items slice is not empty
if len(items) > 0 && len(items[0].Messages) > 0 {
lastMessageIndex := len(items[0].Messages) - 1
lastID = items[0].Messages[lastMessageIndex].ID
readMessages += len(items[0].Messages)
}
// Optionally add a short delay here if needed
}
Expect(readMessages).To(BeNumerically(">=", totalMessages))
wg.Add(1)
go func() {
readerClient := createClient()
defer func() {
GinkgoRecover()
wg.Done()
readerClient.Close()
}()

lastID := "0"
readMessages := 0
for readMessages < totalMessages {
items, err := readerClient.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamKey, lastID},
Block: 0,
}).Result()
if err != nil {
continue
}

// Check if items slice is not empty
if len(items) > 0 && len(items[0].Messages) > 0 {
lastMessageIndex := len(items[0].Messages) - 1
lastID = items[0].Messages[lastMessageIndex].ID
readMessages += len(items[0].Messages)
}
// Optionally add a short delay here if needed
}
Expect(readMessages).To(BeNumerically(">=", totalMessages))
}()
}


wg.Wait()
Eventually(func() int32 {
return atomic.LoadInt32(&messageCount)
Expand All @@ -209,29 +210,27 @@ var _ = Describe("Stream Commands", func() {
Expect(client.Del(ctx, "mystream").Err()).NotTo(HaveOccurred())
// Creating a stream and adding entries
_, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "mystream",
ID: "*",
Values: map[string]interface{}{"key1": "value1", "key2": "value2"},
Stream: "mystream",
ID: "*",
Values: map[string]interface{}{"key1": "value1", "key2": "value2"},
}).Result()
Expect(err).NotTo(HaveOccurred())

// Using keys * to find all keys including the stream
keys, err := client.Keys(ctx, "*").Result()
Expect(err).NotTo(HaveOccurred())

// Checking if the stream 'mystream' exists in the returned keys
found := false
for _, key := range keys {
if key == "mystream" {
found = true
break
}
if key == "mystream" {
found = true
break
}
}
Expect(found).To(BeTrue(), "Stream 'mystream' should exist in keys")
})




It("XADD wrong number of args", func() {
_, err := client.Do(ctx, "XADD", "mystream").Result()
Expect(err).To(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var _ = Describe("String Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ var _ = Describe("Text Txn", func() {
var cmdCost time.Duration

BeforeEach(func() {
txnClient = redis.NewClient(pikaOptions1())
cmdClient = redis.NewClient(pikaOptions1())
txnClient = redis.NewClient(PikaOption(SINGLEADDR))
cmdClient = redis.NewClient(PikaOption(SINGLEADDR))
})
Describe("test watch", func() {
It("basic watch", func() {
Expand Down
Loading

0 comments on commit 60718b1

Please sign in to comment.