@@ -60,10 +60,6 @@ func (mp *mockPool) Get(ctx context.Context) (*pool.Conn, error) {
6060 return nil , errors .New ("not implemented" )
6161}
6262
63- func (mp * mockPool ) GetPubSub (ctx context.Context ) (* pool.Conn , error ) {
64- return nil , errors .New ("not implemented" )
65- }
66-
6763func (mp * mockPool ) Put (ctx context.Context , conn * pool.Conn ) {
6864 // Not implemented for testing
6965}
@@ -107,18 +103,27 @@ func TestRedisConnectionProcessor(t *testing.T) {
107103 }
108104
109105 t .Run ("SuccessfulEventDrivenHandoff" , func (t * testing.T ) {
110- processor := NewRedisConnectionProcessor (3 , baseDialer , nil , nil )
106+ config := & Config {
107+ MinWorkers : 1 ,
108+ MaxWorkers : 2 ,
109+ HandoffQueueSize : 10 , // Explicit queue size to avoid 0-size queue
110+ LogLevel : 2 ,
111+ }
112+ processor := NewRedisConnectionProcessor (3 , baseDialer , config , nil )
111113 defer processor .Shutdown (context .Background ())
112114
113115 conn := createMockPoolConnection ()
114116 if err := conn .MarkForHandoff ("new-endpoint:6379" , 12345 ); err != nil {
115117 t .Fatalf ("Failed to mark connection for handoff: %v" , err )
116118 }
117119
118- // Set a mock initialization function
119- initConnCalled := false
120+ // Set a mock initialization function with synchronization
121+ initConnCalled := make ( chan bool , 1 )
120122 initConnFunc := func (ctx context.Context , cn * pool.Conn ) error {
121- initConnCalled = true
123+ select {
124+ case initConnCalled <- true :
125+ default :
126+ }
122127 return nil
123128 }
124129 conn .SetInitConnFunc (initConnFunc )
@@ -142,22 +147,44 @@ func TestRedisConnectionProcessor(t *testing.T) {
142147 t .Error ("Connection should be in pending handoffs map" )
143148 }
144149
145- // Wait for handoff to complete
146- time .Sleep (100 * time .Millisecond )
150+ // Wait for initialization to be called (indicates handoff started)
151+ select {
152+ case <- initConnCalled :
153+ // Good, initialization was called
154+ case <- time .After (1 * time .Second ):
155+ t .Fatal ("Timeout waiting for initialization function to be called" )
156+ }
157+
158+ // Wait for handoff to complete with proper timeout and polling
159+ timeout := time .After (2 * time .Second )
160+ ticker := time .NewTicker (10 * time .Millisecond )
161+ defer ticker .Stop ()
162+
163+ handoffCompleted := false
164+ for ! handoffCompleted {
165+ select {
166+ case <- timeout :
167+ t .Fatal ("Timeout waiting for handoff to complete" )
168+ case <- ticker .C :
169+ if _ , pending := processor .pending .Load (conn ); ! pending {
170+ handoffCompleted = true
171+ }
172+ }
173+ }
147174
148175 // Verify handoff completed (removed from pending map)
149176 if _ , pending := processor .pending .Load (conn ); pending {
150177 t .Error ("Connection should be removed from pending map after handoff" )
151178 }
152179
153- // Verify handoff state was cleared
154- if conn .ShouldHandoff () {
155- t .Error ("Connection should not be marked for handoff after successful handoff" )
180+ // Verify connection is usable again
181+ if ! conn .IsUsable () {
182+ t .Error ("Connection should be usable after successful handoff" )
156183 }
157184
158- // Verify initialization was called
159- if ! initConnCalled {
160- t .Error ("InitConn should have been called " )
185+ // Verify handoff state is cleared
186+ if conn . ShouldHandoff () {
187+ t .Error ("Connection should not be marked for handoff after completion " )
161188 }
162189 })
163190
@@ -214,7 +241,13 @@ func TestRedisConnectionProcessor(t *testing.T) {
214241 return nil , errors .New ("dial failed" )
215242 }
216243
217- processor := NewRedisConnectionProcessor (3 , failingDialer , nil , nil )
244+ config := & Config {
245+ MinWorkers : 1 ,
246+ MaxWorkers : 2 ,
247+ HandoffQueueSize : 10 , // Explicit queue size to avoid 0-size queue
248+ LogLevel : 2 ,
249+ }
250+ processor := NewRedisConnectionProcessor (3 , failingDialer , config , nil )
218251 defer processor .Shutdown (context .Background ())
219252
220253 conn := createMockPoolConnection ()
@@ -236,8 +269,22 @@ func TestRedisConnectionProcessor(t *testing.T) {
236269 t .Error ("Connection should not be removed when queuing handoff" )
237270 }
238271
239- // Wait for handoff to complete and fail
240- time .Sleep (100 * time .Millisecond )
272+ // Wait for handoff to complete and fail with proper timeout and polling
273+ timeout := time .After (2 * time .Second )
274+ ticker := time .NewTicker (10 * time .Millisecond )
275+ defer ticker .Stop ()
276+
277+ handoffCompleted := false
278+ for ! handoffCompleted {
279+ select {
280+ case <- timeout :
281+ t .Fatal ("Timeout waiting for failed handoff to complete" )
282+ case <- ticker .C :
283+ if _ , pending := processor .pending .Load (conn ); ! pending {
284+ handoffCompleted = true
285+ }
286+ }
287+ }
241288
242289 // Connection should be removed from pending map after failed handoff
243290 if _ , pending := processor .pending .Load (conn ); pending {
@@ -285,7 +332,13 @@ func TestRedisConnectionProcessor(t *testing.T) {
285332 })
286333
287334 t .Run ("ProcessConnectionOnGetWithPendingHandoff" , func (t * testing.T ) {
288- processor := NewRedisConnectionProcessor (3 , baseDialer , nil , nil )
335+ config := & Config {
336+ MinWorkers : 1 ,
337+ MaxWorkers : 2 ,
338+ HandoffQueueSize : 10 , // Explicit queue size to avoid 0-size queue
339+ LogLevel : 2 ,
340+ }
341+ processor := NewRedisConnectionProcessor (3 , baseDialer , config , nil )
289342 defer processor .Shutdown (context .Background ())
290343
291344 conn := createMockPoolConnection ()
@@ -468,8 +521,22 @@ func TestRedisConnectionProcessor(t *testing.T) {
468521 t .Error ("Connection should be pooled after handoff" )
469522 }
470523
471- // Wait for handoff to complete
472- time .Sleep (50 * time .Millisecond )
524+ // Wait for handoff to complete with proper timeout and polling
525+ timeout := time .After (1 * time .Second )
526+ ticker := time .NewTicker (5 * time .Millisecond )
527+ defer ticker .Stop ()
528+
529+ handoffCompleted := false
530+ for ! handoffCompleted {
531+ select {
532+ case <- timeout :
533+ t .Fatal ("Timeout waiting for handoff to complete" )
534+ case <- ticker .C :
535+ if _ , pending := processor .pending .Load (conn ); ! pending {
536+ handoffCompleted = true
537+ }
538+ }
539+ }
473540
474541 // Verify relaxed timeout is set with deadline
475542 if ! conn .HasRelaxedTimeout () {
@@ -626,17 +693,15 @@ func TestRedisConnectionProcessor(t *testing.T) {
626693 }
627694 }
628695
629- // Verify queue has items but capacity remains static
630- currentQueueSize := len (processor .handoffQueue )
631- if currentQueueSize == 0 {
632- t .Error ("Expected some items in queue after processing connections" )
633- }
634-
696+ // Verify queue capacity remains static (the main purpose of this test)
635697 finalCapacity := cap (processor .handoffQueue )
636698 if finalCapacity != 50 {
637699 t .Errorf ("Queue capacity should remain static at 50, got %d" , finalCapacity )
638700 }
639701
702+ // Note: We don't check queue size here because workers process items quickly
703+ // The important thing is that the capacity remains static regardless of pool size
704+ currentQueueSize := len (processor .handoffQueue )
640705 t .Logf ("Static queue test completed - Capacity: %d, Current size: %d" ,
641706 finalCapacity , currentQueueSize )
642707 })
@@ -738,7 +803,21 @@ func TestRedisConnectionProcessor(t *testing.T) {
738803 }
739804
740805 // Wait for the handoff to complete (it happens asynchronously)
741- time .Sleep (50 * time .Millisecond )
806+ timeout := time .After (1 * time .Second )
807+ ticker := time .NewTicker (5 * time .Millisecond )
808+ defer ticker .Stop ()
809+
810+ handoffCompleted := false
811+ for ! handoffCompleted {
812+ select {
813+ case <- timeout :
814+ t .Fatal ("Timeout waiting for handoff to complete" )
815+ case <- ticker .C :
816+ if _ , pending := processor .pending .Load (conn ); ! pending {
817+ handoffCompleted = true
818+ }
819+ }
820+ }
742821
743822 // Verify that relaxed timeout was applied to the new connection
744823 if ! conn .HasRelaxedTimeout () {
0 commit comments