-
Notifications
You must be signed in to change notification settings - Fork 4
/
connection_pool.go
317 lines (268 loc) · 10.4 KB
/
connection_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
package pool
import (
"github.com/Deeptiman/grpc-connection-library/interceptor"
pb "github.com/Deeptiman/grpc-connection-library/ping"
"github.com/Deeptiman/grpc-connection-library/retry"
"io/ioutil"
"os"
"reflect"
"sync/atomic"
"github.com/Deeptiman/go-batch"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
)
// ConnPool struct defines several fields to construct a connection pool. The gRPC connection instance replicated
// using batch processing and queried using reflect.SelectCase that gets the connection object
// from the list of cases as a pseudo-random choice.
//
// Conn: The base gRPC connection that keeps the initial client connection instance.
//
// MaxPoolSize: The maximum number of connections created concurrently in the connection pool.
//
// ConnInstanceBatch: The batch processing will help creates multiple replicas of base gRPC connection instances concurrently.
// https://github.com/Deeptiman/go-batch library used to perform the batch processing.
//
//
// Options: The options will keep the connection pool configurations to create gRPC dialoptions for base connection instances.
//
// PipelineDoneChan: The connection pool runs a concurrency pipeline, so the "PipelineDoneChan" channel will be called after
// all the stages of the pipeline finishes.
//
// Log: The gRPC log will show the internal connection lifecycle that will be useful to debug the connection flow.
type ConnPool struct {
Conn *grpc.ClientConn
MaxPoolSize uint64
ConnInstanceBatch *batch.Batch
ConnBatchQueue *Queue
Options *PoolConnOptions
PipelineDoneChan chan interface{}
Log grpclog.LoggerV2
}
// ConnectionInterceptor defines the interceptors[UnaryServer:UnaryClient] type for a gRPC connection.
type ConnectionInterceptor int
const (
// ConnectionInterceptor constants
UnaryServer ConnectionInterceptor = iota
UnaryClient
)
var (
// DefaultConnBatch : maximum number of connection instances stored for a single batch
DefaultConnBatch uint64 = 20
// DefaultMaxPoolSize : the total size of the connection pool to store the gRPC connection instances
DefaultMaxPoolSize uint64 = 60
// DefaultScheme : gRPC connection scheme to override the default scheme "passthrough" to "dns"
DefaultScheme string = "dns"
// DefaultGrpcInsecure : the authentication [enable/disable] bool flag
DefaultGrpcInsecure bool = true
// DefaultInterceptor : the gRPC connection library currently only supports one type of interceptors to send msg to the server that doesn't expect a response
DefaultInterceptor ConnectionInterceptor = UnaryClient
// DefaultRetriableCodes : possible retriable gRPC connection failure codes
DefaultRetriableCodes = []codes.Code{codes.Aborted, codes.Unknown, codes.ResourceExhausted, codes.Unavailable}
ConnIndex uint64 = 0
ConnPoolPipeline uint64 = 0
ConnRecreateCount uint64 = 0
IsConnRecreate bool = false
)
// NewConnPool will create the connection pool object that will instantiate the configurations for connection batch,
// retryOptions, interceptor.
func NewConnPool(opts ...PoolOptions) *ConnPool {
connPool := &ConnPool{
MaxPoolSize: DefaultMaxPoolSize,
Options: &PoolConnOptions{
insecure: DefaultGrpcInsecure,
scheme: DefaultScheme,
interceptor: DefaultInterceptor,
connBatch: DefaultConnBatch,
retryOption: &retry.RetryOption{
Retry: retry.DefaultRetryCount,
Codes: DefaultRetriableCodes,
Backoff: &retry.Backoff{
Strategy: retry.Linear,
},
},
},
PipelineDoneChan: make(chan interface{}),
Log: grpclog.NewLoggerV2(os.Stdout, ioutil.Discard, ioutil.Discard),
}
for _, opt := range opts {
opt(connPool)
}
connPool.Options.retryOption.Address = connPool.Options.address
connPool.ConnInstanceBatch = batch.NewBatch(batch.WithMaxItems(connPool.Options.connBatch))
connPool.ConnBatchQueue = NewQueue(connPool.MaxPoolSize)
return connPool
}
// ClientConn will create the initial gRPC client connection instance. The connection factory works as a higher
// order function for gRPC retry policy in case of connection failure retries.
func (c *ConnPool) ClientConn() (*grpc.ClientConn, error) {
connectionFactory := func(address string) (*grpc.ClientConn, error) {
var opts []grpc.DialOption
if c.Options.authority != "" {
opts = append(opts, grpc.WithAuthority(c.Options.authority))
}
if c.Options.insecure {
opts = append(opts, grpc.WithInsecure())
} else {
opts = append(opts, grpc.WithTransportCredentials(c.Options.credentials))
}
if c.Options.interceptor == UnaryClient {
// WithUnaryInterceptor DialOption parameter will set the UnaryClient option type for the interceptor RPC calls
opts = append(opts, grpc.WithUnaryInterceptor(interceptor.UnaryClientInterceptor(c.Options.retryOption)))
}
address = c.Options.scheme + ":///" + address
c.Log.Infoln("Dial GRPC Server ....", address)
conn, err := grpc.Dial(address, opts...)
if err != nil {
c.Log.Fatal(err)
return nil, err
}
c.Conn = conn
// gRPC connection instance creates Ping service to test connection health.
client := pb.NewPingServiceClient(c.Conn)
c.Log.Infoln("GRPC Client connected at - address : ", address, " : ConnState = ", c.Conn.GetState())
// The SendPingMsg sends a test msg to the target server address to get the Pong response msg back to verify the connection flow.
respMsg, err := pb.SendPingMsg(client)
if err != nil {
return nil, err
}
c.Log.Infoln("GRPC Pong msg - ", respMsg)
return c.Conn, nil
}
return retry.RetryClientConnection(connectionFactory, c.Options.retryOption)
}
// ConnPoolPipeline follows the concurrency pipeline technique to create a connection pool in a higher
// concurrent scenarios. The pipeline has several stages that use the Fan-In, Fan-Out technique to process the
// data pipeline using channels.
//
// The entire process of creating the connection pool becomes a powerful function using the pipeline technique.
// There are four different stages in this pipeline that works as a generator pattern to create a connection pool.
func (c *ConnPool) ConnPoolPipeline(conn *grpc.ClientConn, pipelineDoneChan chan interface{}) {
// 1#connInstancefn: This stage will create the initial gRPC connection instance that gets passed to the
// next pipeline stage for replication.
connInstancefn := func(done chan interface{}) <-chan *grpc.ClientConn {
connCh := make(chan *grpc.ClientConn)
conn, err := c.ClientConn()
if err != nil {
done <- err
}
go func() {
c.Log.Infoln("1#connInstance ...")
defer close(connCh)
select {
case connCh <- conn:
c.Log.Infoln("GRPC Connection Status - ", conn.GetState().String())
}
}()
return connCh
}
// 2#connReplicasfn: The cloning process of the initial gRPC connection object will begin here.
// The connection instance gets passed to the next stage iteratively via channels.
connReplicasfn := func(connInstanceCh <-chan *grpc.ClientConn) <-chan *grpc.ClientConn {
connInstanceReplicaCh := make(chan *grpc.ClientConn)
go func() {
c.Log.Infoln("2#connReplicas ...")
defer close(connInstanceReplicaCh)
for conn := range connInstanceCh {
for i := 0; uint64(i) < c.MaxPoolSize; i++ {
select {
case connInstanceReplicaCh <- conn:
}
}
}
}()
return connInstanceReplicaCh
}
// 3#connBatchfn: This stage will start the batch processing using the github.com/Deeptiman/go-batch library.
// The MaxPoolSize is divided into multiple batches and released via a supply channel from go-batch library internal implementation.
connBatchfn := func(connInstanceCh <-chan *grpc.ClientConn) chan []batch.BatchItems {
go func() {
c.Log.Infoln("3#connBatch ...")
c.ConnInstanceBatch.StartBatchProcessing()
for conn := range connInstanceCh {
select {
case c.ConnInstanceBatch.Item <- conn:
}
}
}()
return c.ConnInstanceBatch.Consumer.Supply.ClientSupplyCh
}
// 4#connEnqueuefn: The connection queue reads through the go-batch client supply channel and stores the connection
// instances as channel case in []reflect.SelectCase. So, whenever the client requests a connection instance, reflect.SelectCase
// retrieves the conn instances from the case using the pseudo-random technique.
connEnqueuefn := func(connSupplyCh <-chan []batch.BatchItems) <-chan batch.BatchItems {
receiveBatchCh := make(chan batch.BatchItems)
go func() {
c.Log.Infoln("4#connEnqueue ...")
defer close(receiveBatchCh)
for supply := range connSupplyCh {
for _, s := range supply {
c.EnqueConnBatch(s)
select {
case receiveBatchCh <- s:
}
}
}
}()
return receiveBatchCh
}
poolSize := c.GetConnPoolSize()
if poolSize > 0 {
select {
case pipelineDoneChan <- "Done":
c.Log.Infoln("Pool Exists - Size : ", poolSize)
if (c.MaxPoolSize - poolSize) == 1 {
atomic.AddUint64(&ConnRecreateCount, 1)
IsConnRecreate = true
}
return
}
}
// Recreate connection pool
if IsConnRecreate {
c.Log.Infoln("Connection Recreate !!!")
c.ConnBatchQueue.itemSelect = make([]reflect.SelectCase, c.MaxPoolSize)
c.ConnBatchQueue.enqueCh = make([]chan batch.BatchItems, 0, c.MaxPoolSize)
c.PipelineDoneChan = make(chan interface{})
c.ConnInstanceBatch.Unlock()
IsConnRecreate = false
}
done := make(chan interface{})
// Concurrency Pipeline
for s := range connEnqueuefn(connBatchfn(connReplicasfn(connInstancefn(done)))) {
go func(s batch.BatchItems) {
atomic.AddUint64(&ConnPoolPipeline, 1)
if c.GetConnPoolSize() == c.MaxPoolSize {
select {
case pipelineDoneChan <- "Done":
return
}
}
}(s)
}
select {
case <-done:
return
}
}
// EnqueConnBatch will enqueue the batchItems received from the go-batch supply channel.
func (c *ConnPool) EnqueConnBatch(connItems batch.BatchItems) {
c.ConnBatchQueue.Enqueue(connItems)
}
// GetConnBatch will retrieve the batch item from the connection queue that dequeues the items using the pseudo-random technique.
func (c *ConnPool) GetConnBatch() batch.BatchItems {
batchItemCh := make(chan batch.BatchItems)
defer close(batchItemCh)
go c.ConnPoolPipeline(c.Conn, c.PipelineDoneChan)
select {
case <-c.PipelineDoneChan:
c.Log.Infoln("Pipeline Done Channel !")
poolSize := c.GetConnPoolSize() - 1
atomic.StoreUint64(&ConnPoolPipeline, poolSize)
return c.ConnBatchQueue.Dequeue()
}
}
// GetConnPoolSize loads the number of connections created by the connection pool
func (c *ConnPool) GetConnPoolSize() uint64 {
return atomic.LoadUint64(&ConnPoolPipeline)
}