diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..1b38744 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,13 @@ +sudo: false +language: go +go: + - 1.7 + - 1.8 + +install: + - go get google.golang.org/grpc + - go get golang.org/x/net/context + - go get github.com/stretchr/testify + +script: + - go test -race -v ./... diff --git a/director.go b/director.go deleted file mode 100644 index 7563f7e..0000000 --- a/director.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright © 2015 Michal Witkowski -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License.` - -package proxy -import ( - "golang.org/x/net/context" - "google.golang.org/grpc" -) - -// StreamDirector returns a gRPC ClientConn for a stream of a given context. -// The service name, method name, and other `MD` metadata (e.g. authority) can be extracted from Context. -type StreamDirector func(ctx context.Context) (*grpc.ClientConn, error) diff --git a/patch/get_transport.go b/patch/get_transport.go deleted file mode 100644 index 562d23e..0000000 --- a/patch/get_transport.go +++ /dev/null @@ -1,11 +0,0 @@ -package grpc - -import ( - "golang.org/x/net/context" - "google.golang.org/grpc/transport" -) - -// GetTransport returns the balancer of the connection. -func (cc *ClientConn) GetTransport(ctx context.Context) (transport.ClientTransport, func(), error) { - return cc.getTransport(ctx, BalancerGetOptions{}) -} diff --git a/proxy/DOC.md b/proxy/DOC.md new file mode 100644 index 0000000..85c411a --- /dev/null +++ b/proxy/DOC.md @@ -0,0 +1,83 @@ +# proxy +-- + import "github.com/mwitkow/grpc-proxy/proxy" + +Package proxy provides a reverse proxy handler for gRPC. + +The implementation allows a `grpc.Server` to pass a received ServerStream to a +ClientStream without understanding the semantics of the messages exchanged. It +basically provides a transparent reverse-proxy. + +This package is intentionally generic, exposing a `StreamDirector` function that +allows users of this package to implement whatever logic of backend-picking, +dialing and service verification to perform. + +See examples on documented functions. + +## Usage + +#### func Codec + +```go +func Codec() grpc.Codec +``` +Codec returns a proxying grpc.Codec with the default protobuf codec as parent. + +See CodecWithParent. + +#### func CodecWithParent + +```go +func CodecWithParent(fallback grpc.Codec) grpc.Codec +``` +CodecWithParent returns a proxying grpc.Codec with a user provided codec as +parent. + +This codec is *crucial* to the functioning of the proxy. It allows the proxy +server to be oblivious to the schema of the forwarded messages. It basically +treats a gRPC message frame as raw bytes. However, if the server handler, or the +client caller are not proxy-internal functions it will fall back to trying to +decode the message using a fallback codec. + +#### func RegisterService + +```go +func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) +``` +RegisterService sets up a proxy handler for a particular gRPC service and +method. The behaviour is the same as if you were registering a handler method, +e.g. from a codegenerated pb.go file. + +This can *only* be used if the `server` also uses grpcproxy.CodecForServer() +ServerOption. + +#### func TransparentHandler + +```go +func TransparentHandler(director StreamDirector) grpc.StreamHandler +``` +TransparentHandler returns a handler that attempts to proxy all requests that +are not registered in the server. The indented use here is as a transparent +proxy, where the server doesn't know about the services implemented by the +backends. It should be used as a `grpc.UnknownServiceHandler`. + +This can *only* be used if the `server` also uses grpcproxy.CodecForServer() +ServerOption. + +#### type StreamDirector + +```go +type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) +``` + +StreamDirector returns a gRPC ClientConn to be used to forward the call to. + +The presence of the `Context` allows for rich filtering, e.g. based on Metadata +(headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC +error should be returned. + +It is worth noting that the StreamDirector will be fired *after* all server-side +stream interceptors are invoked. So decisions around authorization, monitoring +etc. are better to be handled there. + +See the rather rich example. diff --git a/proxy/README.md b/proxy/README.md new file mode 120000 index 0000000..71bfc07 --- /dev/null +++ b/proxy/README.md @@ -0,0 +1 @@ +DOC.md \ No newline at end of file diff --git a/proxy/codec.go b/proxy/codec.go new file mode 100644 index 0000000..846b9c4 --- /dev/null +++ b/proxy/codec.go @@ -0,0 +1,70 @@ +package proxy + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" +) + +// Codec returns a proxying grpc.Codec with the default protobuf codec as parent. +// +// See CodecWithParent. +func Codec() grpc.Codec { + return CodecWithParent(&protoCodec{}) +} + +// CodecWithParent returns a proxying grpc.Codec with a user provided codec as parent. +// +// This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious +// to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes. +// However, if the server handler, or the client caller are not proxy-internal functions it will fall back +// to trying to decode the message using a fallback codec. +func CodecWithParent(fallback grpc.Codec) grpc.Codec { + return &rawCodec{fallback} +} + +type rawCodec struct { + parentCodec grpc.Codec +} + +type frame struct { + payload []byte +} + +func (c *rawCodec) Marshal(v interface{}) ([]byte, error) { + out, ok := v.(*frame) + if !ok { + return c.parentCodec.Marshal(v) + } + return out.payload, nil + +} + +func (c *rawCodec) Unmarshal(data []byte, v interface{}) error { + dst, ok := v.(*frame) + if !ok { + return c.parentCodec.Unmarshal(data, v) + } + dst.payload = data + return nil +} + +func (c *rawCodec) String() string { + return fmt.Sprintf("proxy>%s", c.parentCodec.String()) +} + +// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC. +type protoCodec struct{} + +func (protoCodec) Marshal(v interface{}) ([]byte, error) { + return proto.Marshal(v.(proto.Message)) +} + +func (protoCodec) Unmarshal(data []byte, v interface{}) error { + return proto.Unmarshal(data, v.(proto.Message)) +} + +func (protoCodec) String() string { + return "proto" +} diff --git a/proxy_codec_test.go b/proxy/codec_test.go similarity index 90% rename from proxy_codec_test.go rename to proxy/codec_test.go index cc2900e..dd3893c 100644 --- a/proxy_codec_test.go +++ b/proxy/codec_test.go @@ -2,13 +2,14 @@ package proxy import ( "testing" + "github.com/stretchr/testify/require" ) -func TestProxyCodec_ReadYourWrites(t *testing.T) { +func TestCodec_ReadYourWrites(t *testing.T) { framePtr := &frame{} data := []byte{0xDE, 0xAD, 0xBE, 0xEF} - codec := codec{} + codec := rawCodec{} require.NoError(t, codec.Unmarshal(data, framePtr), "unmarshalling must go ok") out, err := codec.Marshal(framePtr) require.NoError(t, err, "no marshal error") diff --git a/proxy/director.go b/proxy/director.go new file mode 100644 index 0000000..2e1c916 --- /dev/null +++ b/proxy/director.go @@ -0,0 +1,20 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy + +import ( + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +// StreamDirector returns a gRPC ClientConn to be used to forward the call to. +// +// The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers). +// If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned. +// +// It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors +// are invoked. So decisions around authorization, monitoring etc. are better to be handled there. +// +// See the rather rich example. +type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) diff --git a/proxy/doc.go b/proxy/doc.go new file mode 100644 index 0000000..01328f3 --- /dev/null +++ b/proxy/doc.go @@ -0,0 +1,15 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +/* +Package proxy provides a reverse proxy handler for gRPC. + +The implementation allows a `grpc.Server` to pass a received ServerStream to a ClientStream without understanding +the semantics of the messages exchanged. It basically provides a transparent reverse-proxy. + +This package is intentionally generic, exposing a `StreamDirector` function that allows users of this package +to implement whatever logic of backend-picking, dialing and service verification to perform. + +See examples on documented functions. +*/ +package proxy diff --git a/proxy/examples_test.go b/proxy/examples_test.go new file mode 100644 index 0000000..ad3dbb4 --- /dev/null +++ b/proxy/examples_test.go @@ -0,0 +1,55 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy_test + +import ( + "strings" + + "github.com/mwitkow/grpc-proxy/proxy" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" +) + +var ( + director proxy.StreamDirector +) + +func ExampleRegisterService() { + // A gRPC server with the proxying codec enabled. + server := grpc.NewServer(grpc.CustomCodec(proxy.Codec())) + // Register a TestService with 4 of its methods explicitly. + proxy.RegisterService(server, director, + "mwitkow.testproto.TestService", + "PingEmpty", "Ping", "PingError", "PingList") +} + +func ExampleTransparentHandler() { + grpc.NewServer( + grpc.CustomCodec(proxy.Codec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) +} + +// Provide sa simple example of a director that shields internal services and dials a staging or production backend. +// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. +func ExampleStreamDirector() { + director = func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) { + // Make sure we never forward internal services. + if strings.HasPrefix(fullMethodName, "/com.example.internal.") { + return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + } + md, ok := metadata.FromContext(ctx) + if ok { + // Decide on which backend to dial + if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { + // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. + return grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) + } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { + return grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) + } + } + return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + } +} diff --git a/proxy.go b/proxy/handler.go similarity index 63% rename from proxy.go rename to proxy/handler.go index f674d77..bacf7d8 100644 --- a/proxy.go +++ b/proxy/handler.go @@ -1,3 +1,6 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + package proxy import ( @@ -15,8 +18,12 @@ var ( } ) -func RegisterProxyStreams(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { - streamer := &proxyStreamer{director} +// RegisterService sets up a proxy handler for a particular gRPC service and method. +// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. +// +// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. +func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { + streamer := &handler{director} fakeDesc := &grpc.ServiceDesc{ ServiceName: serviceName, HandlerType: (*interface{})(nil), @@ -33,25 +40,36 @@ func RegisterProxyStreams(server *grpc.Server, director StreamDirector, serviceN server.RegisterService(fakeDesc, streamer) } -type proxyStreamer struct { +// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. +// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the +// backends. It should be used as a `grpc.UnknownServiceHandler`. +// +// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. +func TransparentHandler(director StreamDirector) grpc.StreamHandler { + streamer := &handler{director} + return streamer.handler +} + +type handler struct { director StreamDirector } -// proxyStreamHandler is where the real magic of proxying happens. +// handler is where the real magic of proxying happens. // It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire, // forwarding it to a ClientStream established against the relevant ClientConn. -func (s *proxyStreamer) handler(srv interface{}, serverStream grpc.ServerStream) error { - backendConn, err := s.director(serverStream.Context()) - if err != nil { - return err - } +func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error { // little bit of gRPC internals never hurt anyone lowLevelServerStream, ok := transport.StreamFromContext(serverStream.Context()) if !ok { return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context") } + fullMethodName := lowLevelServerStream.Method() + backendConn, err := s.director(serverStream.Context(), fullMethodName) + if err != nil { + return err + } // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. - clientStream, err := grpc.NewClientStream(serverStream.Context(), clientStreamDescForProxying, backendConn, lowLevelServerStream.Method()) + clientStream, err := grpc.NewClientStream(serverStream.Context(), clientStreamDescForProxying, backendConn, fullMethodName) if err != nil { return err } @@ -59,7 +77,7 @@ func (s *proxyStreamer) handler(srv interface{}, serverStream grpc.ServerStream) s2cErr := <-s.forwardServerToClient(serverStream, clientStream) c2sErr := <-s.forwardClientToServer(clientStream, serverStream) if s2cErr != io.EOF { - return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr, c2sErr) + return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) } serverStream.SetTrailer(clientStream.Trailer()) // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. @@ -69,7 +87,7 @@ func (s *proxyStreamer) handler(srv interface{}, serverStream grpc.ServerStream) return nil } -func (s *proxyStreamer) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { +func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { ret := make(chan error, 1) go func() { f := &frame{} @@ -102,7 +120,7 @@ func (s *proxyStreamer) forwardClientToServer(src grpc.ClientStream, dst grpc.Se return ret } -func (s *proxyStreamer) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error { +func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error { ret := make(chan error, 1) go func() { f := &frame{} diff --git a/proxy_test.go b/proxy/handler_test.go similarity index 89% rename from proxy_test.go rename to proxy/handler_test.go index 4cdd3d8..014d3d4 100644 --- a/proxy_test.go +++ b/proxy/handler_test.go @@ -1,31 +1,18 @@ -//// Copyright © 2015 Michal Witkowski -//// -//// Licensed under the Apache License, Version 2.0 (the "License"); -//// you may not use this file except in compliance with the License. -//// You may obtain a copy of the License at -//// http://www.apache.org/licenses/LICENSE-2.0 -//// -//// Unless required by applicable law or agreed to in writing, software -//// distributed under the License is distributed on an "AS IS" BASIS, -//// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -//// See the License for the specific language governing permissions and -//// limitations under the License. -// +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + package proxy_test import ( + "io" + "log" "net" + "os" "strings" "testing" "time" - "github.com/mwitkow/grpc-proxy" - pb "github.com/mwitkow/grpc-proxy/testservice" - - "io" - "log" - "os" - + "github.com/mwitkow/grpc-proxy/proxy" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -34,6 +21,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" + + pb "github.com/mwitkow/grpc-proxy/testservice" ) const ( @@ -173,9 +162,9 @@ func (s *ProxyHappySuite) SetupSuite() { pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) // Setup of the proxy's Director. - proxyClientConn, err := grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), proxy.WithProxyCodec()) + proxyClientConn, err := grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy)) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context) (*grpc.ClientConn, error) { + director := func(ctx context.Context, fullName string) (*grpc.ClientConn, error) { md, ok := metadata.FromContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { @@ -185,11 +174,13 @@ func (s *ProxyHappySuite) SetupSuite() { return proxyClientConn, nil } s.proxy = grpc.NewServer( - proxy.ProxyCodec(), + grpc.CustomCodec(proxy.Codec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), ) - proxy.RegisterProxyStreams(s.proxy, director, + // Ping handler is handled as an explicit registration and not as a TransparentHandler. + proxy.RegisterService(s.proxy, director, "mwitkow.testproto.TestService", - "PingEmpty", "Ping", "PingError", "PingList") + "Ping") // Start the serving loops. go func() { diff --git a/proxy_codec.go b/proxy_codec.go deleted file mode 100644 index abe969c..0000000 --- a/proxy_codec.go +++ /dev/null @@ -1,63 +0,0 @@ -package proxy - -import ( - "fmt" - "google.golang.org/grpc" - "github.com/golang/protobuf/proto" -) - -// ProxyCodec is custom codec for gRPC server that a no-op codec if the unmarshalling is done to/from bytes. -// This is required for proxy functionality (as the proxy doesn't know the types). But in case of methods implemented -// on the server, it falls back to the proto codec. -func ProxyCodec() grpc.ServerOption { - return grpc.CustomCodec(&codec{&protoCodec{}}) -} - -func WithProxyCodec() grpc.DialOption { - return grpc.WithCodec(&codec{&protoCodec{}}) -} - -type codec struct { - parentCodec grpc.Codec -} - -type frame struct { - payload []byte -} - -func (c *codec) Marshal(v interface{}) ([]byte, error) { - out, ok := v.(*frame) - if !ok { - return c.parentCodec.Marshal(v) - } - return out.payload, nil - -} - -func (c *codec) Unmarshal(data []byte, v interface{}) error { - dst, ok := v.(*frame) - if !ok { - return c.parentCodec.Unmarshal(data, v) - } - dst.payload = data - return nil -} - -func (c *codec) String() string { - return fmt.Sprintf("proxy>%s", c.parentCodec.String()) -} - -// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC. -type protoCodec struct{} - -func (protoCodec) Marshal(v interface{}) ([]byte, error) { - return proto.Marshal(v.(proto.Message)) -} - -func (protoCodec) Unmarshal(data []byte, v interface{}) error { - return proto.Unmarshal(data, v.(proto.Message)) -} - -func (protoCodec) String() string { - return "proto" -}