4
4
"errors"
5
5
"fmt"
6
6
"net"
7
- "sync"
8
7
"testing"
9
8
"time"
10
9
@@ -14,15 +13,19 @@ import (
14
13
func TestPool_Reconnect (t * testing.T ) {
15
14
host , port , pass := getSonicConfig (t )
16
15
17
- closeAllCond := sync .NewCond (& sync.Mutex {})
18
16
proxyLn , proxyDoneCh := runTCPProxy (t ,
19
17
fmt .Sprintf ("%s:%d" , host , port ), // Target addr.
20
18
"127.0.0.1:0" , // Proxy addr.
21
19
)
22
20
23
21
proxyHost , proxyPort := mustSplitHostPort (t , proxyLn .Addr ().String ())
24
22
25
- ing , err := sonic .NewIngester (proxyHost , proxyPort , pass )
23
+ ing , err := sonic .NewIngester (
24
+ proxyHost ,
25
+ proxyPort ,
26
+ pass ,
27
+ sonic .OptionPoolPingThreshold (time .Nanosecond ),
28
+ )
26
29
if err != nil {
27
30
t .Fatal ("NewIngester" , err )
28
31
}
@@ -34,10 +37,6 @@ func TestPool_Reconnect(t *testing.T) {
34
37
t .Fatal ("Ping" , err )
35
38
}
36
39
37
- closeAllCond .L .Lock ()
38
- closeAllCond .Broadcast ()
39
- closeAllCond .L .Unlock ()
40
-
41
40
err = ing .Ping ()
42
41
if err != nil {
43
42
t .Fatal ("Ping" , err )
@@ -85,6 +84,80 @@ func TestPool_Reconnect(t *testing.T) {
85
84
}
86
85
}
87
86
87
+ func TestPool_Reconnect_Threshold (t * testing.T ) {
88
+ host , port , pass := getSonicConfig (t )
89
+
90
+ proxyLn , proxyDoneCh := runTCPProxy (t ,
91
+ fmt .Sprintf ("%s:%d" , host , port ), // Target addr.
92
+ "127.0.0.1:0" , // Proxy addr.
93
+ )
94
+
95
+ proxyHost , proxyPort := mustSplitHostPort (t , proxyLn .Addr ().String ())
96
+
97
+ ing , err := sonic .NewIngester (
98
+ proxyHost ,
99
+ proxyPort ,
100
+ pass ,
101
+ sonic .OptionPoolPingThreshold (time .Minute ),
102
+ )
103
+ if err != nil {
104
+ t .Fatal ("NewIngester" , err )
105
+ }
106
+
107
+ // Connection healthy, ping should work.
108
+
109
+ err = ing .Ping ()
110
+ if err != nil {
111
+ t .Fatal ("Ping" , err )
112
+ }
113
+
114
+ err = ing .Ping ()
115
+ if err != nil {
116
+ t .Fatal ("Ping" , err )
117
+ }
118
+
119
+ // Close connection, ping should not work.
120
+
121
+ err = proxyLn .Close ()
122
+ if err != nil {
123
+ t .Fatal ("Close" , err )
124
+ }
125
+
126
+ select {
127
+ case <- proxyDoneCh :
128
+ case <- time .After (2 * time .Second ):
129
+ t .Fatal ("Timeout" )
130
+ }
131
+
132
+ err = ing .Ping ()
133
+ if err == nil {
134
+ t .Fatal ("Ping" , err )
135
+ }
136
+
137
+ // Reconnect in threshold, ping still should not work.
138
+
139
+ proxyLn , proxyDoneCh = runTCPProxy (t ,
140
+ fmt .Sprintf ("%s:%d" , host , port ), // Target addr.
141
+ fmt .Sprintf ("%s:%d" , proxyHost , proxyPort ), // Proxy addr.
142
+ )
143
+
144
+ err = ing .Ping ()
145
+ if err == nil {
146
+ t .Fatal ("Ping" , err )
147
+ }
148
+
149
+ err = proxyLn .Close ()
150
+ if err != nil {
151
+ t .Fatal ("Close" , err )
152
+ }
153
+
154
+ select {
155
+ case <- proxyDoneCh :
156
+ case <- time .After (2 * time .Second ):
157
+ t .Fatal ("Timeout" )
158
+ }
159
+ }
160
+
88
161
func runTCPProxy (
89
162
tb testing.TB ,
90
163
targetAddr string ,
0 commit comments