@@ -11,12 +11,15 @@ import (
1111)
1212
1313var ctx = context .Background ()
14- var consStopped = false
1514
15+ // This example is not supposed to be run as is. It is just a test to see how pubsub behaves in relation to pool management.
16+ // It was used to find regressions in pool management in hitless mode.
17+ // Please don't use it as a reference for how to use pubsub.
1618func main () {
1719 wg := & sync.WaitGroup {}
1820 rdb := redis .NewClient (& redis.Options {
19- Addr : ":6379" ,
21+ Addr : ":6379" ,
22+ HitlessUpgrades : true ,
2023 })
2124 _ = rdb .FlushDB (ctx ).Err ()
2225
@@ -30,21 +33,22 @@ func main() {
3033 if err != nil {
3134 panic (err )
3235 }
33- if err := rdb .Set (ctx , "prods " , "0" , 0 ).Err (); err != nil {
36+ if err := rdb .Set (ctx , "publishers " , "0" , 0 ).Err (); err != nil {
3437 panic (err )
3538 }
36- if err := rdb .Set (ctx , "cons " , "0" , 0 ).Err (); err != nil {
39+ if err := rdb .Set (ctx , "subscribers " , "0" , 0 ).Err (); err != nil {
3740 panic (err )
3841 }
39- if err := rdb .Set (ctx , "cntr " , "0" , 0 ).Err (); err != nil {
42+ if err := rdb .Set (ctx , "published " , "0" , 0 ).Err (); err != nil {
4043 panic (err )
4144 }
42- if err := rdb .Set (ctx , "recs " , "0" , 0 ).Err (); err != nil {
45+ if err := rdb .Set (ctx , "received " , "0" , 0 ).Err (); err != nil {
4346 panic (err )
4447 }
45- fmt .Println ("cntr " , rdb .Get (ctx , "cntr " ).Val ())
46- fmt .Println ("recs " , rdb .Get (ctx , "recs " ).Val ())
48+ fmt .Println ("published " , rdb .Get (ctx , "published " ).Val ())
49+ fmt .Println ("received " , rdb .Get (ctx , "received " ).Val ())
4750 subCtx , cancelSubCtx := context .WithCancel (ctx )
51+ pubCtx , cancelPublishers := context .WithCancel (ctx )
4852 for i := 0 ; i < 10 ; i ++ {
4953 wg .Add (1 )
5054 go subscribe (subCtx , rdb , "test" , i , wg )
@@ -54,32 +58,39 @@ func main() {
5458 time .Sleep (time .Second )
5559 subCtx , cancelSubCtx = context .WithCancel (ctx )
5660 for i := 0 ; i < 10 ; i ++ {
57- if err := rdb .Incr (ctx , "prods " ).Err (); err != nil {
61+ if err := rdb .Incr (ctx , "publishers " ).Err (); err != nil {
5862 panic (err )
5963 }
6064 wg .Add (1 )
61- go floodThePool (subCtx , rdb , wg )
65+ go floodThePool (pubCtx , rdb , wg )
6266 }
6367
6468 for i := 0 ; i < 500 ; i ++ {
65- if err := rdb .Incr (ctx , "cons " ).Err (); err != nil {
69+ if err := rdb .Incr (ctx , "subscribers " ).Err (); err != nil {
6670 panic (err )
6771 }
6872 wg .Add (1 )
6973 go subscribe (subCtx , rdb , "test2" , i , wg )
7074 }
75+ time .Sleep (5 * time .Second )
76+ fmt .Println ("canceling publishers" )
77+ cancelPublishers ()
7178 time .Sleep (10 * time .Second )
72- fmt .Println ("canceling" )
79+ fmt .Println ("canceling subscribers " )
7380 cancelSubCtx ()
7481 wg .Wait ()
75- cntr , err := rdb .Get (ctx , "cntr" ).Result ()
76- recs , err := rdb .Get (ctx , "recs" ).Result ()
77- prods , err := rdb .Get (ctx , "prods" ).Result ()
78- cons , err := rdb .Get (ctx , "cons" ).Result ()
79- fmt .Printf ("cntr: %s\n " , cntr )
80- fmt .Printf ("recs: %s\n " , recs )
81- fmt .Printf ("prods: %s\n " , prods )
82- fmt .Printf ("cons: %s\n " , cons )
82+ published , err := rdb .Get (ctx , "published" ).Result ()
83+ received , err := rdb .Get (ctx , "received" ).Result ()
84+ publishers , err := rdb .Get (ctx , "publishers" ).Result ()
85+ subscribers , err := rdb .Get (ctx , "subscribers" ).Result ()
86+ fmt .Printf ("publishers: %s\n " , publishers )
87+ fmt .Printf ("published: %s\n " , published )
88+ fmt .Printf ("subscribers: %s\n " , subscribers )
89+ fmt .Printf ("received: %s\n " , received )
90+ publishedInt , err := rdb .Get (ctx , "published" ).Int ()
91+ subscribersInt , err := rdb .Get (ctx , "subscribers" ).Int ()
92+ fmt .Printf ("if drained = published*subscribers: %d\n " , publishedInt * subscribersInt )
93+
8394 time .Sleep (2 * time .Second )
8495}
8596
@@ -88,8 +99,6 @@ func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
8899 for {
89100 select {
90101 case <- ctx .Done ():
91- fmt .Println ("floodThePool stopping" )
92- consStopped = true
93102 return
94103 default :
95104 }
@@ -99,7 +108,7 @@ func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
99108 //log.Println("publish error:", err)
100109 }
101110
102- err = rdb .Incr (ctx , "cntr " ).Err ()
111+ err = rdb .Incr (ctx , "published " ).Err ()
103112 if err != nil {
104113 // noop
105114 //log.Println("incr error:", err)
@@ -110,36 +119,24 @@ func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) {
110119
111120func subscribe (ctx context.Context , rdb * redis.Client , topic string , subscriberId int , wg * sync.WaitGroup ) {
112121 defer wg .Done ()
113- defer fmt .Printf ("subscriber %d stopping\n " , subscriberId )
114122 rec := rdb .Subscribe (ctx , topic )
115123 recChan := rec .Channel ()
116124 for {
117125 select {
118126 case <- ctx .Done ():
119127 rec .Close ()
120- if subscriberId == 199 {
121- fmt .Printf ("subscriber %d done\n " , subscriberId )
122- }
123128 return
124129 default :
125130 select {
126131 case <- ctx .Done ():
127132 rec .Close ()
128- if subscriberId == 199 {
129- fmt .Printf ("subscriber %d done\n " , subscriberId )
130- }
131133 return
132134 case msg := <- recChan :
133- err := rdb .Incr (ctx , "recs " ).Err ()
135+ err := rdb .Incr (ctx , "received " ).Err ()
134136 if err != nil {
135137 log .Println ("incr error:" , err )
136138 }
137- if consStopped {
138- fmt .Printf ("subscriber %d received %s\n " , subscriberId , msg .Payload )
139- }
140- if subscriberId == 199 {
141- fmt .Printf ("subscriber %d received %s\n " , subscriberId , msg .Payload )
142- }
139+ _ = msg // Use the message to avoid unused variable warning
143140 }
144141 }
145142 }
0 commit comments