Skip to content
This repository has been archived by the owner on Dec 2, 2023. It is now read-only.

[WIP] Adjust architecture #553

Merged
merged 20 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor business logic.
  • Loading branch information
cloudwebrtc committed Apr 27, 2021
commit 89523d9c05edb89f091960bbf4421f5982e0eda6
34 changes: 33 additions & 1 deletion cmd/biz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"strings"

log "github.com/pion/ion-log"
pb "github.com/pion/ion-sfu/cmd/signal/grpc/proto"
"github.com/pion/ion/cmd/biz/proxy"
"github.com/pion/ion/cmd/biz/server"
bizpb "github.com/pion/ion/pkg/grpc/biz"
"github.com/pion/ion/pkg/node/biz"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var (
Expand Down Expand Up @@ -100,7 +107,31 @@ func main() {
options.TLSAddr = addr
}

s := server.NewWrapperedGRPCWebServer(options)
director := func(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return ctx, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
return ctx, conn, err
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
return ctx, conn, err
}
}
return ctx, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}

srv := grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))

s := server.NewWrapperedGRPCWebServer(options, srv)

node := biz.NewBIZ("biz01")
if err := node.Start(conf); err != nil {
Expand All @@ -114,6 +145,7 @@ func main() {
sfusig := &biz.SFUSignalBridge{
BizServer: node.Service(),
}

s.GRPCServer.RegisterService(&pb.SFU_ServiceDesc, sfusig)

if err := s.Serve(); err != nil {
Expand Down
70 changes: 70 additions & 0 deletions cmd/biz/proxy/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package proxy

import (
"fmt"

"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
)

// Codec returns a proxying grpc.Codec with the default protobuf codec as parent.
//
// See CodecWithParent.
func Codec() grpc.Codec {
return CodecWithParent(&protoCodec{})
}

// CodecWithParent returns a proxying grpc.Codec with a user provided codec as parent.
//
// This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious
// to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes.
// However, if the server handler, or the client caller are not proxy-internal functions it will fall back
// to trying to decode the message using a fallback codec.
func CodecWithParent(fallback grpc.Codec) grpc.Codec {
return &rawCodec{fallback}
}

type rawCodec struct {
parentCodec grpc.Codec
}

type frame struct {
payload []byte
}

func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {
out, ok := v.(*frame)
if !ok {
return c.parentCodec.Marshal(v)
}
return out.payload, nil

}

func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {
dst, ok := v.(*frame)
if !ok {
return c.parentCodec.Unmarshal(data, v)
}
dst.payload = data
return nil
}

func (c *rawCodec) String() string {
return fmt.Sprintf("proxy>%s", c.parentCodec.String())
}

// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC.
type protoCodec struct{}

func (protoCodec) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}

func (protoCodec) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}

func (protoCodec) String() string {
return "proto"
}
24 changes: 24 additions & 0 deletions cmd/biz/proxy/director.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.

package proxy

import (
"golang.org/x/net/context"
"google.golang.org/grpc"
)

// StreamDirector returns a gRPC ClientConn to be used to forward the call to.
//
// The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers).
// If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned.
//
// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
//
// It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
//
// See the rather rich example.
type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error)
178 changes: 178 additions & 0 deletions cmd/biz/proxy/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.

package proxy

import (
"io"

nrpc "github.com/cloudwebrtc/nats-grpc/pkg/rpc"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
clientStreamDescForProxying = &grpc.StreamDesc{
ServerStreams: true,
ClientStreams: true,
}
)

// RegisterService sets up a proxy handler for a particular gRPC service and method.
// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) {
streamer := &handler{director}
fakeDesc := &grpc.ServiceDesc{
ServiceName: serviceName,
HandlerType: (*interface{})(nil),
}
for _, m := range methodNames {
streamDesc := grpc.StreamDesc{
StreamName: m,
Handler: streamer.handler,
ServerStreams: true,
ClientStreams: true,
}
fakeDesc.Streams = append(fakeDesc.Streams, streamDesc)
}
server.RegisterService(fakeDesc, streamer)
}

// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server.
// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the
// backends. It should be used as a `grpc.UnknownServiceHandler`.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
func TransparentHandler(director StreamDirector) grpc.StreamHandler {
streamer := &handler{director}
return streamer.handler
}

type handler struct {
director StreamDirector
}

// handler is where the real magic of proxying happens.
// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire,
// forwarding it to a ClientStream established against the relevant ClientConn.
func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error {
// little bit of gRPC internals never hurt anyone
fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
if !ok {
return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
}
// We require that the director's returned context inherits from the serverStream.Context().
outgoingCtx, clientConnIf, err := s.director(serverStream.Context(), fullMethodName)
if err != nil {
return err
}

clientCtx, clientCancel := context.WithCancel(outgoingCtx)

// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.

var clientStream grpc.ClientStream
nrpcClient, ok := clientConnIf.(*nrpc.Client)
if ok {
clientStream, err = nrpcClient.NewStream(clientCtx, clientStreamDescForProxying, fullMethodName)
if err != nil {
return err
}
}

gpcClientConn, ok := clientConnIf.(*grpc.ClientConn)
if ok {
clientStream, err = grpc.NewClientStream(clientCtx, clientStreamDescForProxying, gpcClientConn, fullMethodName)
if err != nil {
return err
}
}

// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
// We don't know which side is going to stop sending first, so we need a select between the two.
for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
if s2cErr == io.EOF {
// this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
// the clientStream>serverStream may continue pumping though.
clientStream.CloseSend()
} else {
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
clientCancel()
return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
// will be nil.
serverStream.SetTrailer(clientStream.Trailer())
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
if c2sErr != io.EOF {
return c2sErr
}
return nil
}
}
return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}

func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)
go func() {
f := &frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if i == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := src.Header()
if err != nil {
ret <- err
break
}
if err := dst.SendHeader(md); err != nil {
ret <- err
break
}
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}

func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
ret := make(chan error, 1)
go func() {
f := &frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err // this can be io.EOF which is happy case
break
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}
4 changes: 2 additions & 2 deletions cmd/biz/server/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type WrapperedGRPCWebServer struct {
GRPCServer *grpc.Server
}

func NewWrapperedGRPCWebServer(options WrapperedServerOptions) *WrapperedGRPCWebServer {
func NewWrapperedGRPCWebServer(options WrapperedServerOptions, s *grpc.Server) *WrapperedGRPCWebServer {
return &WrapperedGRPCWebServer{
options: options,
GRPCServer: grpc.NewServer(),
GRPCServer: s,
}
}

Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ module github.com/pion/ion

go 1.13

replace github.com/cloudwebrtc/nats-grpc => ../ion-project/nats-grpc

require (
github.com/cloudwebrtc/nats-discovery v0.1.0
github.com/cloudwebrtc/nats-grpc v0.1.3
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/go-redis/redis/v7 v7.4.0
github.com/golang/protobuf v1.4.3
github.com/improbable-eng/grpc-web v0.13.0
github.com/nats-io/nats-server/v2 v2.1.9
github.com/nats-io/nats.go v1.10.0
Expand All @@ -20,8 +23,9 @@ require (
github.com/square/go-jose/v3 v3.0.0-20200630053402-0a67ce9b0693
github.com/stretchr/testify v1.7.0
github.com/tj/assert v0.0.3
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4 // indirect
google.golang.org/grpc v1.35.0
google.golang.org/protobuf v1.25.0
google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12
)
Loading