@@ -3,6 +3,7 @@ package pool
33import (
44 "context"
55 "errors"
6+ "fmt"
67 "log"
78 "net"
89 "sync"
@@ -102,7 +103,7 @@ type ConnPool struct {
102103 conns []* Conn
103104 idleConns []* Conn
104105
105- poolSize int
106+ poolSize atomic. Int32
106107 idleConnsLen int
107108
108109 stats Stats
@@ -140,16 +141,16 @@ func (p *ConnPool) checkMinIdleConns() {
140141
141142 // Only create idle connections if we haven't reached the total pool size limit
142143 // MinIdleConns should be a subset of PoolSize, not additional connections
143- for p .poolSize < p .cfg .PoolSize && p .idleConnsLen < p .cfg .MinIdleConns {
144+ for p .poolSize . Load () < int32 ( p .cfg .PoolSize ) && p .idleConnsLen < p .cfg .MinIdleConns {
144145 select {
145146 case p .queue <- struct {}{}:
146- p .poolSize ++
147+ p .poolSize . Add ( 1 )
147148 p .idleConnsLen ++
148149 go func () {
149150 err := p .addIdleConn ()
150151 if err != nil && err != ErrClosed {
151152 p .connsMu .Lock ()
152- p .poolSize --
153+ p .poolSize . Add ( - 1 )
153154 p .idleConnsLen --
154155 p .connsMu .Unlock ()
155156 }
@@ -197,7 +198,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
197198 }
198199
199200 p .connsMu .Lock ()
200- if p .cfg .MaxActiveConns > 0 && p .poolSize >= p .cfg .MaxActiveConns {
201+ if p .cfg .MaxActiveConns > 0 && p .poolSize . Load () >= int32 ( p .cfg .MaxActiveConns ) {
201202 p .connsMu .Unlock ()
202203 return nil , ErrPoolExhausted
203204 }
@@ -214,18 +215,20 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
214215 p .connsMu .Lock ()
215216 defer p .connsMu .Unlock ()
216217
217- if p .cfg .MaxActiveConns > 0 && p .poolSize >= p .cfg .MaxActiveConns {
218+ if p .cfg .MaxActiveConns > 0 && p .poolSize . Load () >= int32 ( p .cfg .MaxActiveConns ) {
218219 _ = cn .Close ()
219220 return nil , ErrPoolExhausted
220221 }
221222
222223 p .conns = append (p .conns , cn )
223224 if pooled {
224225 // If pool is full remove the cn on next Put.
225- if p .poolSize >= p .cfg .PoolSize {
226+ currentPoolSize := p .poolSize .Load ()
227+ if currentPoolSize >= int32 (p .cfg .PoolSize ) {
228+ fmt .Printf ("Conn %d poolSize (%d) >= cfg.PoolSize (%d), setting pooled to false\n " , cn .GetID (), currentPoolSize , p .cfg .PoolSize )
226229 cn .pooled = false
227230 } else {
228- p .poolSize ++
231+ p .poolSize . Add ( 1 )
229232 }
230233 }
231234
@@ -314,9 +317,12 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
314317 return nil , ErrClosed
315318 }
316319
317- //if err := p.waitTurn(ctx); err != nil {
318- //return nil, err
319- // }
320+ // if it is not a pubsub connection, we need to wait for a turn
321+ if ! isPubSub {
322+ if err := p .waitTurn (ctx ); err != nil {
323+ return nil , err
324+ }
325+ }
320326
321327 tries := 0
322328 now := time .Now ()
@@ -331,7 +337,10 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
331337 p .connsMu .Unlock ()
332338
333339 if err != nil {
334- //p.freeTurn()
340+ if ! isPubSub {
341+ p .freeTurn ()
342+ }
343+
335344 return nil , err
336345 }
337346
@@ -340,6 +349,7 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
340349 }
341350
342351 if ! p .isHealthyConn (cn , now ) {
352+ fmt .Printf ("Conn %d is not healthy, closing...\n " , cn .GetID ())
343353 _ = p .CloseConn (cn )
344354 continue
345355 }
@@ -348,6 +358,7 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
348358 // Fast path: check processor existence once and cache the result
349359 if processor := p .cfg .ConnectionProcessor ; processor != nil {
350360 if err := processor .ProcessConnectionOnGet (ctx , cn ); err != nil {
361+ fmt .Printf ("Conn %d failed processor on get, closing...\n " , cn .GetID ())
351362 // Failed to process connection, discard it
352363 _ = p .CloseConn (cn )
353364 continue
@@ -357,20 +368,25 @@ func (p *ConnPool) getConn(ctx context.Context, isPubSub bool) (*Conn, error) {
357368 atomic .AddUint32 (& p .stats .Hits , 1 )
358369 cn .isPubSub = isPubSub
359370 if isPubSub {
371+ cn .pooled = false
372+ p .poolSize .Add (- 1 )
360373 atomic .AddUint32 (& p .stats .PubSubCount , 1 )
361374 }
362375 return cn , nil
363376 }
364377
365378 atomic .AddUint32 (& p .stats .Misses , 1 )
366379
367- newcn , err := p .newConn (ctx , true )
380+ newcn , err := p .newConn (ctx , ! isPubSub )
368381 if err != nil {
369- //p.freeTurn()
382+ if ! isPubSub {
383+ p .freeTurn ()
384+ }
370385 return nil , err
371386 }
372- newcn . isPubSub = isPubSub
387+
373388 if isPubSub {
389+ newcn .isPubSub = true
374390 atomic .AddUint32 (& p .stats .PubSubCount , 1 )
375391 }
376392 return newcn , nil
@@ -449,6 +465,8 @@ func (p *ConnPool) popIdle() (*Conn, error) {
449465 if cn .IsUsable () {
450466 p .idleConnsLen --
451467 break
468+ } else {
469+ fmt .Printf ("Connection %d is not usable, retrying...\n " , cn .GetID ())
452470 }
453471
454472 // Connection is not usable, put it back in the pool
@@ -478,6 +496,7 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
478496 shouldRemove := false
479497 var err error
480498 if cn .isPubSub {
499+ fmt .Printf ("Conn %d is pubsub, removing...\n " , cn .GetID ())
481500 p .Remove (ctx , cn , err )
482501 return
483502 }
@@ -492,19 +511,23 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
492511 }
493512 }
494513
514+ fmt .Printf ("Conn %d shouldPool: %v, shouldRemove: %v\n " , cn .GetID (), shouldPool , shouldRemove )
495515 // If processor says to remove the connection, do so
496516 if shouldRemove {
517+ fmt .Printf ("Conn %d shouldRemove, removing...\n " , cn .GetID ())
497518 p .Remove (ctx , cn , nil )
498519 return
499520 }
500521
501522 // If processor says not to pool the connection, remove it
502523 if ! shouldPool {
524+ fmt .Printf ("Conn %d !shouldPool, removing...\n " , cn .GetID ())
503525 p .Remove (ctx , cn , nil )
504526 return
505527 }
506528
507529 if ! cn .pooled {
530+ fmt .Printf ("Conn %d !cn.pooled, removing...\n " , cn .GetID ())
508531 p .Remove (ctx , cn , nil )
509532 return
510533 }
@@ -517,15 +540,17 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
517540 p .idleConns = append (p .idleConns , cn )
518541 p .idleConnsLen ++
519542 } else {
543+ fmt .Printf ("Conn %d p.idleConnsLen >= p.cfg.MaxIdleConns, removing...\n " , cn .GetID ())
520544 p .removeConn (cn )
521545 shouldCloseConn = true
522546 }
523547
524548 p .connsMu .Unlock ()
525549
526- // p.freeTurn()
550+ p .freeTurn ()
527551
528552 if shouldCloseConn {
553+ fmt .Printf ("Conn %d shouldCloseConn, closing...\n " , cn .GetID ())
529554 _ = p .closeConn (cn )
530555 }
531556}
@@ -552,7 +577,7 @@ func (p *ConnPool) removeConn(cn *Conn) {
552577 if c == cn {
553578 p .conns = append (p .conns [:i ], p .conns [i + 1 :]... )
554579 if cn .pooled {
555- p .poolSize --
580+ p .poolSize . Add ( - 1 )
556581 // Immediately check for minimum idle connections when a pooled connection is removed
557582 p .checkMinIdleConns ()
558583 }
@@ -634,7 +659,7 @@ func (p *ConnPool) Close() error {
634659 }
635660 }
636661 p .conns = nil
637- p .poolSize = 0
662+ p .poolSize . Store ( 0 )
638663 p .idleConns = nil
639664 p .idleConnsLen = 0
640665 p .connsMu .Unlock ()
0 commit comments