Skip to content

Commit f1e8761

Browse files
authored
Merge branch 'master' into hybrid-search
2 parents 43c5b39 + ae5434c commit f1e8761

File tree

12 files changed

+1381
-100
lines changed

12 files changed

+1381
-100
lines changed

async_handoff_integration_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
5353
Dialer: func(ctx context.Context) (net.Conn, error) {
5454
return &mockNetConn{addr: "original:6379"}, nil
5555
},
56-
PoolSize: int32(5),
57-
PoolTimeout: time.Second,
56+
PoolSize: int32(5),
57+
MaxConcurrentDials: 5,
58+
PoolTimeout: time.Second,
5859
})
5960

6061
// Add the hook to the pool after creation
@@ -153,8 +154,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
153154
return &mockNetConn{addr: "original:6379"}, nil
154155
},
155156

156-
PoolSize: int32(10),
157-
PoolTimeout: time.Second,
157+
PoolSize: int32(10),
158+
MaxConcurrentDials: 10,
159+
PoolTimeout: time.Second,
158160
})
159161
defer testPool.Close()
160162

@@ -225,8 +227,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
225227
return &mockNetConn{addr: "original:6379"}, nil
226228
},
227229

228-
PoolSize: int32(3),
229-
PoolTimeout: time.Second,
230+
PoolSize: int32(3),
231+
MaxConcurrentDials: 3,
232+
PoolTimeout: time.Second,
230233
})
231234
defer testPool.Close()
232235

@@ -288,8 +291,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
288291
return &mockNetConn{addr: "original:6379"}, nil
289292
},
290293

291-
PoolSize: int32(2),
292-
PoolTimeout: time.Second,
294+
PoolSize: int32(2),
295+
MaxConcurrentDials: 2,
296+
PoolTimeout: time.Second,
293297
})
294298
defer testPool.Close()
295299

extra/redisotel/metrics.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package redisotel
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"net"
78
"sync"
@@ -271,9 +272,10 @@ func (mh *metricsHook) DialHook(hook redis.DialHook) redis.DialHook {
271272

272273
dur := time.Since(start)
273274

274-
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+1)
275+
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+2)
275276
attrs = append(attrs, mh.attrs...)
276277
attrs = append(attrs, statusAttr(err))
278+
attrs = append(attrs, errorTypeAttribute(err))
277279

278280
mh.createTime.Record(ctx, milliseconds(dur), metric.WithAttributeSet(attribute.NewSet(attrs...)))
279281
return conn, err
@@ -288,10 +290,11 @@ func (mh *metricsHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
288290

289291
dur := time.Since(start)
290292

291-
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+2)
293+
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+3)
292294
attrs = append(attrs, mh.attrs...)
293295
attrs = append(attrs, attribute.String("type", "command"))
294296
attrs = append(attrs, statusAttr(err))
297+
attrs = append(attrs, errorTypeAttribute(err))
295298

296299
mh.useTime.Record(ctx, milliseconds(dur), metric.WithAttributeSet(attribute.NewSet(attrs...)))
297300

@@ -309,10 +312,11 @@ func (mh *metricsHook) ProcessPipelineHook(
309312

310313
dur := time.Since(start)
311314

312-
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+2)
315+
attrs := make([]attribute.KeyValue, 0, len(mh.attrs)+3)
313316
attrs = append(attrs, mh.attrs...)
314317
attrs = append(attrs, attribute.String("type", "pipeline"))
315318
attrs = append(attrs, statusAttr(err))
319+
attrs = append(attrs, errorTypeAttribute(err))
316320

317321
mh.useTime.Record(ctx, milliseconds(dur), metric.WithAttributeSet(attribute.NewSet(attrs...)))
318322

@@ -330,3 +334,16 @@ func statusAttr(err error) attribute.KeyValue {
330334
}
331335
return attribute.String("status", "ok")
332336
}
337+
338+
func errorTypeAttribute(err error) attribute.KeyValue {
339+
switch {
340+
case err == nil:
341+
return attribute.String("error_type", "none")
342+
case errors.Is(err, context.Canceled):
343+
return attribute.String("error_type", "context_canceled")
344+
case errors.Is(err, context.DeadlineExceeded):
345+
return attribute.String("error_type", "context_timeout")
346+
default:
347+
return attribute.String("error_type", "other")
348+
}
349+
}

internal/pool/bench_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ func BenchmarkPoolGetPut(b *testing.B) {
3131
for _, bm := range benchmarks {
3232
b.Run(bm.String(), func(b *testing.B) {
3333
connPool := pool.NewConnPool(&pool.Options{
34-
Dialer: dummyDialer,
35-
PoolSize: int32(bm.poolSize),
36-
PoolTimeout: time.Second,
37-
DialTimeout: 1 * time.Second,
38-
ConnMaxIdleTime: time.Hour,
34+
Dialer: dummyDialer,
35+
PoolSize: int32(bm.poolSize),
36+
MaxConcurrentDials: bm.poolSize,
37+
PoolTimeout: time.Second,
38+
DialTimeout: 1 * time.Second,
39+
ConnMaxIdleTime: time.Hour,
3940
})
4041

4142
b.ResetTimer()
@@ -75,11 +76,12 @@ func BenchmarkPoolGetRemove(b *testing.B) {
7576
for _, bm := range benchmarks {
7677
b.Run(bm.String(), func(b *testing.B) {
7778
connPool := pool.NewConnPool(&pool.Options{
78-
Dialer: dummyDialer,
79-
PoolSize: int32(bm.poolSize),
80-
PoolTimeout: time.Second,
81-
DialTimeout: 1 * time.Second,
82-
ConnMaxIdleTime: time.Hour,
79+
Dialer: dummyDialer,
80+
PoolSize: int32(bm.poolSize),
81+
MaxConcurrentDials: bm.poolSize,
82+
PoolTimeout: time.Second,
83+
DialTimeout: 1 * time.Second,
84+
ConnMaxIdleTime: time.Hour,
8385
})
8486

8587
b.ResetTimer()

internal/pool/buffer_size_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ var _ = Describe("Buffer Size Configuration", func() {
2424

2525
It("should use default buffer sizes when not specified", func() {
2626
connPool = pool.NewConnPool(&pool.Options{
27-
Dialer: dummyDialer,
28-
PoolSize: int32(1),
29-
PoolTimeout: 1000,
27+
Dialer: dummyDialer,
28+
PoolSize: int32(1),
29+
MaxConcurrentDials: 1,
30+
PoolTimeout: 1000,
3031
})
3132

3233
cn, err := connPool.NewConn(ctx)
@@ -46,11 +47,12 @@ var _ = Describe("Buffer Size Configuration", func() {
4647
customWriteSize := 64 * 1024 // 64KB
4748

4849
connPool = pool.NewConnPool(&pool.Options{
49-
Dialer: dummyDialer,
50-
PoolSize: int32(1),
51-
PoolTimeout: 1000,
52-
ReadBufferSize: customReadSize,
53-
WriteBufferSize: customWriteSize,
50+
Dialer: dummyDialer,
51+
PoolSize: int32(1),
52+
MaxConcurrentDials: 1,
53+
PoolTimeout: 1000,
54+
ReadBufferSize: customReadSize,
55+
WriteBufferSize: customWriteSize,
5456
})
5557

5658
cn, err := connPool.NewConn(ctx)
@@ -67,11 +69,12 @@ var _ = Describe("Buffer Size Configuration", func() {
6769

6870
It("should handle zero buffer sizes by using defaults", func() {
6971
connPool = pool.NewConnPool(&pool.Options{
70-
Dialer: dummyDialer,
71-
PoolSize: int32(1),
72-
PoolTimeout: 1000,
73-
ReadBufferSize: 0, // Should use default
74-
WriteBufferSize: 0, // Should use default
72+
Dialer: dummyDialer,
73+
PoolSize: int32(1),
74+
MaxConcurrentDials: 1,
75+
PoolTimeout: 1000,
76+
ReadBufferSize: 0, // Should use default
77+
WriteBufferSize: 0, // Should use default
7578
})
7679

7780
cn, err := connPool.NewConn(ctx)
@@ -103,9 +106,10 @@ var _ = Describe("Buffer Size Configuration", func() {
103106
// Test the scenario where someone creates a pool directly (like in tests)
104107
// without setting ReadBufferSize and WriteBufferSize
105108
connPool = pool.NewConnPool(&pool.Options{
106-
Dialer: dummyDialer,
107-
PoolSize: int32(1),
108-
PoolTimeout: 1000,
109+
Dialer: dummyDialer,
110+
PoolSize: int32(1),
111+
MaxConcurrentDials: 1,
112+
PoolTimeout: 1000,
109113
// ReadBufferSize and WriteBufferSize are not set (will be 0)
110114
})
111115

internal/pool/hooks_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,9 @@ func TestPoolWithHooks(t *testing.T) {
191191
Dialer: func(ctx context.Context) (net.Conn, error) {
192192
return &net.TCPConn{}, nil // Mock connection
193193
},
194-
PoolSize: 1,
195-
DialTimeout: time.Second,
194+
PoolSize: 1,
195+
MaxConcurrentDials: 1,
196+
DialTimeout: time.Second,
196197
}
197198

198199
pool := NewConnPool(opt)

internal/pool/pool.go

Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ type Options struct {
9898

9999
PoolFIFO bool
100100
PoolSize int32
101+
MaxConcurrentDials int
101102
DialTimeout time.Duration
102103
PoolTimeout time.Duration
103104
MinIdleConns int32
@@ -126,7 +127,9 @@ type ConnPool struct {
126127
dialErrorsNum uint32 // atomic
127128
lastDialError atomic.Value
128129

129-
queue chan struct{}
130+
queue chan struct{}
131+
dialsInProgress chan struct{}
132+
dialsQueue *wantConnQueue
130133

131134
connsMu sync.Mutex
132135
conns map[uint64]*Conn
@@ -152,9 +155,11 @@ func NewConnPool(opt *Options) *ConnPool {
152155
p := &ConnPool{
153156
cfg: opt,
154157

155-
queue: make(chan struct{}, opt.PoolSize),
156-
conns: make(map[uint64]*Conn),
157-
idleConns: make([]*Conn, 0, opt.PoolSize),
158+
queue: make(chan struct{}, opt.PoolSize),
159+
conns: make(map[uint64]*Conn),
160+
dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials),
161+
dialsQueue: newWantConnQueue(),
162+
idleConns: make([]*Conn, 0, opt.PoolSize),
158163
}
159164

160165
// Only create MinIdleConns if explicitly requested (> 0)
@@ -233,6 +238,7 @@ func (p *ConnPool) checkMinIdleConns() {
233238
return
234239
}
235240
}
241+
236242
}
237243

238244
func (p *ConnPool) addIdleConn() error {
@@ -491,9 +497,8 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
491497

492498
atomic.AddUint32(&p.stats.Misses, 1)
493499

494-
newcn, err := p.newConn(ctx, true)
500+
newcn, err := p.queuedNewConn(ctx)
495501
if err != nil {
496-
p.freeTurn()
497502
return nil, err
498503
}
499504

@@ -512,6 +517,99 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
512517
return newcn, nil
513518
}
514519

520+
func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
521+
select {
522+
case p.dialsInProgress <- struct{}{}:
523+
// Got permission, proceed to create connection
524+
case <-ctx.Done():
525+
p.freeTurn()
526+
return nil, ctx.Err()
527+
}
528+
529+
dialCtx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
530+
531+
w := &wantConn{
532+
ctx: dialCtx,
533+
cancelCtx: cancel,
534+
result: make(chan wantConnResult, 1),
535+
}
536+
var err error
537+
defer func() {
538+
if err != nil {
539+
if cn := w.cancel(); cn != nil {
540+
p.putIdleConn(ctx, cn)
541+
p.freeTurn()
542+
}
543+
}
544+
}()
545+
546+
p.dialsQueue.enqueue(w)
547+
548+
go func(w *wantConn) {
549+
var freeTurnCalled bool
550+
defer func() {
551+
if err := recover(); err != nil {
552+
if !freeTurnCalled {
553+
p.freeTurn()
554+
}
555+
internal.Logger.Printf(context.Background(), "queuedNewConn panic: %+v", err)
556+
}
557+
}()
558+
559+
defer w.cancelCtx()
560+
defer func() { <-p.dialsInProgress }() // Release connection creation permission
561+
562+
dialCtx := w.getCtxForDial()
563+
cn, cnErr := p.newConn(dialCtx, true)
564+
delivered := w.tryDeliver(cn, cnErr)
565+
if cnErr == nil && delivered {
566+
return
567+
} else if cnErr == nil && !delivered {
568+
p.putIdleConn(dialCtx, cn)
569+
p.freeTurn()
570+
freeTurnCalled = true
571+
} else {
572+
p.freeTurn()
573+
freeTurnCalled = true
574+
}
575+
}(w)
576+
577+
select {
578+
case <-ctx.Done():
579+
err = ctx.Err()
580+
return nil, err
581+
case result := <-w.result:
582+
err = result.err
583+
return result.cn, err
584+
}
585+
}
586+
587+
func (p *ConnPool) putIdleConn(ctx context.Context, cn *Conn) {
588+
for {
589+
w, ok := p.dialsQueue.dequeue()
590+
if !ok {
591+
break
592+
}
593+
if w.tryDeliver(cn, nil) {
594+
return
595+
}
596+
}
597+
598+
cn.SetUsable(true)
599+
600+
p.connsMu.Lock()
601+
defer p.connsMu.Unlock()
602+
603+
if p.closed() {
604+
_ = cn.Close()
605+
return
606+
}
607+
608+
// poolSize is increased in newConn
609+
p.idleConns = append(p.idleConns, cn)
610+
p.idleConnsLen.Add(1)
611+
}
612+
515613
func (p *ConnPool) waitTurn(ctx context.Context) error {
516614
select {
517615
case <-ctx.Done():

0 commit comments

Comments
 (0)