Skip to content

Commit

Permalink
move out forward logic to method, allowing for use as grpc.Server n…
Browse files Browse the repository at this point in the history
…ot found handler.
  • Loading branch information
Michal Witkowski committed Oct 23, 2015
1 parent 89e28b4 commit 28341d1
Showing 1 changed file with 48 additions and 43 deletions.
91 changes: 48 additions & 43 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,44 @@
// limitations under the License.

package proxy

import (
"net"
"fmt"
"io"
"net"
"strings"
"sync"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/transport"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/codes"
"golang.org/x/net/context"
"google.golang.org/grpc/transport"
)


// transportWriter is a common interface between gRPC transport.ServerTransport and transport.ClientTransport.
type transportWriter interface {
Write(s *transport.Stream, data []byte, opts *transport.Options) error
}

type Proxy struct {
mu sync.Mutex
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
logger grpclog.Logger
mu sync.Mutex
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
logger grpclog.Logger
director StreamDirector
opts *options
opts *options
}

// NewServer creates a gRPC proxy which will use the `StreamDirector` for making routing decisions.
func NewServer(director StreamDirector, opt ...ProxyOption) *Proxy {
s := &Proxy{
lis: make(map[net.Listener]bool),
conns: make(map[transport.ServerTransport]bool),
opts: &options{},
lis: make(map[net.Listener]bool),
conns: make(map[transport.ServerTransport]bool),
opts: &options{},
director: director,
logger: &defaultLogger{},
logger: &defaultLogger{},
}
for _, o := range opt {
o(s.opts)
Expand Down Expand Up @@ -138,31 +138,53 @@ func (s *Proxy) handleStream(frontTrans transport.ServerTransport, frontStream *
}
return
}
ProxyStream(s.director, s.logger, frontTrans, frontStream)

}

backendTrans, backendStream, err := s.backendTransportStream(frontStream.Context())
// Stop stops the gRPC server. Once Stop returns, the server stops accepting
// connection requests and closes all the connected connections.
func (s *Proxy) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
cs := s.conns
s.conns = nil
s.mu.Unlock()
for lis := range listeners {
lis.Close()
}
for c := range cs {
c.Close()
}
}

// ProxyStream performs a forward of a gRPC frontend stream to a backend.
func ProxyStream(director StreamDirector, logger grpclog.Logger, frontTrans transport.ServerTransport, frontStream *transport.Stream) {
backendTrans, backendStream, err := backendTransportStream(director, frontStream.Context())
if err != nil {
frontTrans.WriteStatus(frontStream, grpc.Code(err), grpc.ErrorDesc(err))
s.logger.Printf("proxy: Proxy.handleStream %v failed to allocate backend: %v", frontStream.Method(), err)
logger.Printf("proxy: Proxy.handleStream %v failed to allocate backend: %v", frontStream.Method(), err)
return
}
defer backendTrans.CloseStream(backendStream, nil)

// data coming from client call to backend
ingressPathChan := s.forwardDataFrames(frontStream, backendStream, backendTrans)
ingressPathChan := forwardDataFrames(frontStream, backendStream, backendTrans)

// custom header handling *must* be after some data is processed by the backend, otherwise there's a deadlock
headerMd, err := backendStream.Header()
if err == nil && len(headerMd) > 0 {
frontTrans.WriteHeader(frontStream, headerMd)
}
// data coming from backend back to client call
egressPathChan := s.forwardDataFrames(backendStream, frontStream, frontTrans)
egressPathChan := forwardDataFrames(backendStream, frontStream, frontTrans)

// wait for both data streams to complete.
egressErr := <- egressPathChan
ingressErr := <- ingressPathChan
egressErr := <-egressPathChan
ingressErr := <-ingressPathChan
if egressErr != nil || ingressErr != nil {
s.logger.Printf("proxy: Proxy.handleStream %v failure during transfer ingres: %v egress: %v", frontStream.Method(), ingressErr, egressErr)
logger.Printf("proxy: Proxy.handleStream %v failure during transfer ingres: %v egress: %v", frontStream.Method(), ingressErr, egressErr)
frontTrans.WriteStatus(frontStream, codes.Unavailable, fmt.Sprintf("problem in transfer ingress: %v egress: %v", ingressErr, egressErr))
return
}
Expand All @@ -175,8 +197,8 @@ func (s *Proxy) handleStream(frontTrans transport.ServerTransport, frontStream *
}

// backendTransportStream picks and establishes a Stream to the backend.
func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTransport, *transport.Stream, error) {
grpcConn, err := s.director(ctx)
func backendTransportStream(director StreamDirector, ctx context.Context) (transport.ClientTransport, *transport.Stream, error) {
grpcConn, err := director(ctx)
if err != nil {
if grpc.Code(err) != codes.Unknown { // rpcError check
return nil, nil, err
Expand All @@ -189,7 +211,7 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra
frontendStream, _ := transport.StreamFromContext(ctx)
callHdr := &transport.CallHdr{
Method: frontendStream.Method(),
Host: "TODOFIXTLS", // TODO(michal): This can fail if the backend server is using TLS Hostname verification. Use conn.authority, once it's public?
Host: "TODOFIXTLS", // TODO(michal): This can fail if the backend server is using TLS Hostname verification. Use conn.authority, once it's public?
}
backendStream, err := backendTrans.NewStream(ctx, callHdr)
if err != nil {
Expand All @@ -200,10 +222,10 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra

// forwardDataFrames moves data from one gRPC transport `Stream` to another in async fashion.
// It returns an error channel. `nil` on it signifies everything was fine, anything else is a serious problem.
func (s *Proxy) forwardDataFrames(srcStream *transport.Stream, dstStream *transport.Stream, dstTransport transportWriter) chan error {
func forwardDataFrames(srcStream *transport.Stream, dstStream *transport.Stream, dstTransport transportWriter) chan error {
ret := make(chan error)

go func () {
go func() {
data := make([]byte, 4096)
opt := &transport.Options{}
for {
Expand All @@ -227,22 +249,5 @@ func (s *Proxy) forwardDataFrames(srcStream *transport.Stream, dstStream *transp
close(ret)
}()
return ret

}

// Stop stops the gRPC server. Once Stop returns, the server stops accepting
// connection requests and closes all the connected connections.
func (s *Proxy) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
cs := s.conns
s.conns = nil
s.mu.Unlock()
for lis := range listeners {
lis.Close()
}
for c := range cs {
c.Close()
}
}

0 comments on commit 28341d1

Please sign in to comment.