|
7 | 7 | "net"
|
8 | 8 | "strconv"
|
9 | 9 | "sync"
|
| 10 | + "testing" |
10 | 11 | "time"
|
11 | 12 |
|
12 | 13 | . "github.com/onsi/ginkgo"
|
@@ -739,3 +740,71 @@ var _ = Describe("Ring Tx timeout", func() {
|
739 | 740 | testTimeout()
|
740 | 741 | })
|
741 | 742 | })
|
| 743 | + |
| 744 | +type fixedHash string |
| 745 | + |
| 746 | +func (h fixedHash) Get(string) string { |
| 747 | + return string(h) |
| 748 | +} |
| 749 | + |
| 750 | +func TestRingSetAddrsAndRebalanceRace(t *testing.T) { |
| 751 | + const ( |
| 752 | + ringShard1Name = "ringShardOne" |
| 753 | + ringShard2Name = "ringShardTwo" |
| 754 | + ) |
| 755 | + |
| 756 | + ring := redis.NewRing(&redis.RingOptions{ |
| 757 | + Addrs: map[string]string{ |
| 758 | + ringShard1Name: ":" + ringShard1Port, |
| 759 | + }, |
| 760 | + // Disable heartbeat |
| 761 | + HeartbeatFrequency: 1 * time.Hour, |
| 762 | + NewConsistentHash: func(shards []string) redis.ConsistentHash { |
| 763 | + switch len(shards) { |
| 764 | + case 1: |
| 765 | + return fixedHash(ringShard1Name) |
| 766 | + case 2: |
| 767 | + return fixedHash(ringShard2Name) |
| 768 | + default: |
| 769 | + t.Fatalf("Unexpected number of shards: %v", shards) |
| 770 | + return nil |
| 771 | + } |
| 772 | + }, |
| 773 | + }) |
| 774 | + |
| 775 | + // Continuously update addresses by adding and removing one address |
| 776 | + updatesDone := make(chan struct{}) |
| 777 | + defer func() { close(updatesDone) }() |
| 778 | + go func() { |
| 779 | + for i := 0; ; i++ { |
| 780 | + select { |
| 781 | + case <-updatesDone: |
| 782 | + return |
| 783 | + default: |
| 784 | + if i%2 == 0 { |
| 785 | + ring.SetAddrs(map[string]string{ |
| 786 | + ringShard1Name: ":" + ringShard1Port, |
| 787 | + }) |
| 788 | + } else { |
| 789 | + ring.SetAddrs(map[string]string{ |
| 790 | + ringShard1Name: ":" + ringShard1Port, |
| 791 | + ringShard2Name: ":" + ringShard2Port, |
| 792 | + }) |
| 793 | + } |
| 794 | + } |
| 795 | + } |
| 796 | + }() |
| 797 | + |
| 798 | + timer := time.NewTimer(1 * time.Second) |
| 799 | + for running := true; running; { |
| 800 | + select { |
| 801 | + case <-timer.C: |
| 802 | + running = false |
| 803 | + default: |
| 804 | + shard, err := ring.ShardByKey("whatever") |
| 805 | + if err == nil && shard == nil { |
| 806 | + t.Fatal("shard is nil") |
| 807 | + } |
| 808 | + } |
| 809 | + } |
| 810 | +} |
0 commit comments