grpc-connection-library supports the gRPC client-server connection interface for the developers to use as a gRPC middleware in the application. The library is written in Golang with a concurrency pipeline design pattern to synchronize the gRPC connection pool system.
- The gRPC connection flow among server/client synchronized using the Ping/Pong services.
- gRPC connection library supports connection pool reuse the gRPC client connection instance.
- Concurrency Pipeline design pattern synchronizes the data flow among several stages while creating the connection pool.
- The selection process of the gRPC connection instance from the pool is designed using the reflect.SelectCase that supports pseudo-random technique for choosing among different cases.
- go-batch processing library implemented to divide the connection instances from the pool into batches.
- The grpc-retry policy helps to retry the failure of gRPC connections with backoff strategy.
- The grpclog will show the internal connection lifecycle that will be useful to debug the connection flow.
go get github.com/Deeptiman/go-connection-library
Documentation at pkg.go.dev
Client:
package main
import (
"fmt"
grpc "github.com/Deeptiman/grpc-connection-library/grpc"
)
func main() {
address := "localhost:50051"
client, err := grpc.NewGRPCConnection(grpc.WithAddress(address), grpc.WithConnectionType(grpc.Client))
if err != nil {
fmt.Println("Failed to create GRPC Connection - ", err.Error())
return
}
conn, err := client.GetConn()
if err != nil {
return
}
fmt.Println("GRPC Client Connected State= ", conn.GetState().String())
}
Server
package main
import (
"fmt"
grpc "github.com/Deeptiman/grpc-connection-library/grpc"
)
func main() {
server, err := grpc.NewGRPCConnection(grpc.WithConnectionType(grpc.Server))
if err != nil {
fmt.Println("Failed to create GRPC Connection - ", err.Error())
return
}
server.ListenAndServe()
}
The library provides a Ping/Pong service facility to test the client/server connection flow. The service helps to establish connection health check status.
protos:
syntax = "proto3";
package ping;
option go_package = ".;ping";
import "google/protobuf/timestamp.proto";
service PingService {
rpc SendPongMsg(Request) returns (Response) {}
}
message Request {
string message = 1;
}
message Pong {
int32 index = 1;
string message = 2;
google.protobuf.Timestamp received_on = 3;
}
message Response {
Pong pong = 1;
}
Server:
- ListenAndServe will start listening to the specific serverPort with the "tcp" network type.
- After the server listener socket is opened, the initial Ping request gets registered that can be used by any gRPC clients to sent Ping-Pong request to check the client-server gRPC connection health status.
grpcServer := grpc.NewServer(serverOptions...)
// Register the Ping service request
pb.RegisterPingServiceServer(grpcServer, &pb.PingService{})
if err = grpcServer.Serve(listener); err != nil {
g.log.Errorln("failed start server - %v", err)
g.log.Fatal(err)
return err
}
Client:
- gRPC client connection instance creates Ping service to test connection health.
- The SendPingMsg sends a test msg to the target server address to get the Pong response msg back to verify the connection flow.
conn, err := grpc.Dial(address, opts...)
if err != nil {
c.Log.Fatal(err)
return nil, err
}
client := pb.NewPingServiceClient(c.Conn)
respMsg, err := pb.SendPingMsg(client)
if err != nil {
return nil, err
}
c.Log.Infoln("GRPC Pong msg - ", respMsg)
- ConnPoolPipeline() follows the concurrency pipeline technique to create a connection pool in a higher concurrent scenarios. The pipeline has several stages that use the Fan-In, Fan-Out technique to process the data pipeline using channels.
- The entire process of creating the connection pool becomes a powerful function using the pipeline technique. The four stages work as a generator pattern for the connection pool.
This stage will create the initial gRPC connection instance that gets passed to the next pipeline stage for replication.
connInstancefn := func(done chan interface{}) <-chan *grpc.ClientConn {
connCh := make(chan *grpc.ClientConn)
conn, err := c.ClientConn()
if err != nil {
done <- err
}
go func() {
c.Log.Infoln("1#connInstance ...")
defer close(connCh)
select {
case connCh <- conn:
c.Log.Infoln("GRPC Connection Status - ", conn.GetState().String())
}
}()
return connCh
}
The cloning process of the initial gRPC connection object will begin here. The connection instance gets passed to the next stage iteratively via channels.
connReplicasfn := func(connInstanceCh <-chan *grpc.ClientConn) <-chan *grpc.ClientConn {
connInstanceReplicaCh := make(chan *grpc.ClientConn)
go func() {
c.Log.Infoln("2#connReplicas ...")
defer close(connInstanceReplicaCh)
for conn := range connInstanceCh {
for i := 0; uint64(i) < c.MaxPoolSize; i++ {
select {
case connInstanceReplicaCh <- conn:
}
}
}
}()
return connInstanceReplicaCh
}
This stage will start the batch processing using the github.com/Deeptiman/go-batch library. The MaxPoolSize is divided into multiple batches and released via a supply channel from go-batch library internal implementation.
connBatchfn := func(connInstanceCh <-chan *grpc.ClientConn) chan []batch.BatchItems {
go func() {
c.Log.Infoln("3#connBatch ...")
c.ConnInstanceBatch.StartBatchProcessing()
for conn := range connInstanceCh {
select {
case c.ConnInstanceBatch.Item <- conn:
}
}
}()
return c.ConnInstanceBatch.Consumer.Supply.ClientSupplyCh
}
The connection queue reads through the go-batch client supply channel and stores the connection instances as channel case in []reflect.SelectCase. So, whenever the client requests a connection instance, reflect.SelectCase retrieves the conn instances from the case using the pseudo-random technique.
connEnqueuefn := func(connSupplyCh <-chan []batch.BatchItems) <-chan batch.BatchItems {
receiveBatchCh := make(chan batch.BatchItems)
go func() {
c.Log.Infoln("4#connEnqueue ...")
defer close(receiveBatchCh)
for supply := range connSupplyCh {
for _, s := range supply {
c.EnqueConnBatch(s)
select {
case receiveBatchCh <- s:
}
}
}
}()
return receiveBatchCh
}
for s := range connEnqueuefn(connBatchfn(connReplicasfn(connInstancefn(done)))) {
go func(s batch.BatchItems) {
atomic.AddUint64(&ConnPoolPipeline, 1)
if c.GetConnPoolSize() == c.MaxPoolSize {
select {
case pipelineDoneChan <- "Done":
return
}
}
}(s)
}
type BatchItems struct {
Id int `json:"id"`
BatchNo int `json:"batchNo"`
Item interface{} `json:"item"`
}
There are 12 gRPC connection instances stored in 3 batches.
Batch | Items | ||||
---|---|---|---|---|---|
Batch-1 |
|
||||
Batch-2 |
|
||||
Batch-3 |
|
So, the connection pool stores the connection instances as a batch processing array []batch.BatchItems{}.
The connection instance selection from the pool happens using the reflect.SelectCase package. The connection instances stored as a channel case in the []reflect.SelectCase{}. The search for any available connection instance from the pool works as a pseudo-random case selector.
// The enqueCh will store the array of channel batchItems and get added to the reflect.SelectCase.
enqueCh []chan batch.BatchItems
q.itemSelect[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(q.enqueCh[i])
}
for {
chosen, rcv, ok := reflect.Select(q.itemSelect)
if !ok {
q.log.Infoln("Conn Batch Instance Not Chosen = ", chosen)
continue
}
q.log.Infoln("SelectCase", "Batch Conn : chosen = ", chosen)
// Remove the selected case from the array to avoid the duplicate choosing of the cases.
q.itemSelect = append(q.itemSelect[:chosen], q.itemSelect[chosen+1:]...)
return rcv.Interface().(batch.BatchItems)
}
This project is licensed under the MIT License