Skip to content

Commit

Permalink
Introduce rpc.Params (cadence-workflow#4517)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Sep 29, 2021
1 parent f2ff1c3 commit 580c448
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 183 deletions.
3 changes: 0 additions & 3 deletions cmd/server/cadence/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ func startHandler(c *cli.Context) {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM)
for _, svc := range services {
if _, ok := cfg.Services[svc]; !ok {
log.Fatalf("`%v` service missing config", svc)
}
server := newServer(svc, &cfg)
daemons = append(daemons, server)
server.Start()
Expand Down
17 changes: 12 additions & 5 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ func newServer(service string, cfg *config.Config) common.Daemon {

// Start starts the server
func (s *server) Start() {
if _, ok := s.cfg.Services[s.name]; !ok {
log.Fatalf("`%v` service missing config", s)
}
s.daemon = s.startService()
}

Expand All @@ -97,6 +94,11 @@ func (s *server) Stop() {

// startService starts a service with the given name and config
func (s *server) startService() common.Daemon {
svcCfg, err := s.cfg.GetServiceConfig(s.name)
if err != nil {
log.Fatal(err.Error())
}

params := resource.Params{}
params.Name = service.FullName(s.name)

Expand Down Expand Up @@ -142,9 +144,14 @@ func (s *server) startService() common.Daemon {
dynamicconfig.ClusterNameFilter(clusterGroupMetadata.CurrentClusterName),
)

svcCfg := s.cfg.Services[s.name]
params.MetricScope = svcCfg.Metrics.NewScope(params.Logger, params.Name)
params.RPCFactory = rpc.NewFactory(params.Name, svcCfg.RPC, params.Logger, rpc.NewGRPCPorts(s.cfg))

rpcParams, err := rpc.NewParams(params.Name, s.cfg)
if err != nil {
log.Fatalf("error creating rpc factory params: %v", err)
}
params.RPCFactory = rpc.NewFactory(params.Logger, rpcParams)

params.MembershipFactory, err = s.cfg.Ringpop.NewFactory(
params.RPCFactory.GetDispatcher(),
params.Name,
Expand Down
11 changes: 11 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package config

import (
"encoding/json"
"fmt"
"log"
"time"

Expand All @@ -31,6 +32,7 @@ import (

"github.com/uber/cadence/common/dynamicconfig"
c "github.com/uber/cadence/common/dynamicconfig/configstore/config"
"github.com/uber/cadence/common/service"
)

type (
Expand Down Expand Up @@ -480,3 +482,12 @@ func (c *Config) String() string {
out, _ := json.MarshalIndent(c, "", " ")
return string(out)
}

func (c *Config) GetServiceConfig(serviceName string) (Service, error) {
shortName := service.ShortName(serviceName)
serviceConfig, ok := c.Services[shortName]
if !ok {
return Service{}, fmt.Errorf("no config section for service: %s", shortName)
}
return serviceConfig, nil
}
12 changes: 12 additions & 0 deletions common/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/service"
)

func TestToString(t *testing.T) {
Expand Down Expand Up @@ -102,3 +103,14 @@ func TestConfigErrorInAuthorizationConfig(t *testing.T) {
err := cfg.ValidateAndFillDefaults()
require.Error(t, err)
}

func TestGetServiceConfig(t *testing.T) {
cfg := Config{}
_, err := cfg.GetServiceConfig(service.Frontend)
assert.EqualError(t, err, "no config section for service: frontend")

cfg = Config{Services: map[string]Service{"frontend": {RPC: RPC{GRPCPort: 123}}}}
svc, err := cfg.GetServiceConfig(service.Frontend)
assert.NoError(t, err)
assert.NotEmpty(t, svc)
}
139 changes: 60 additions & 79 deletions common/rpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,89 +21,93 @@
package rpc

import (
"fmt"
"net"
"sync"

"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
)

// Factory is an implementation of service.RPCFactory interface
// Factory is an implementation of common.RPCFactory interface
type Factory struct {
config config.RPC
serviceName string
tchannel *tchannel.ChannelTransport
grpc *grpc.Transport
logger log.Logger
grpcPorts GRPCPorts

sync.Mutex
dispatcher *yarpc.Dispatcher
logger log.Logger
hostAddressMapper HostAddressMapper
tchannel *tchannel.Transport
grpc *grpc.Transport
dispatcher *yarpc.Dispatcher
}

// NewFactory builds a new rpc.Factory
// conforming to the underlying configuration
func NewFactory(service string, cfg config.RPC, logger log.Logger, grpcPorts GRPCPorts) *Factory {
return &Factory{config: cfg, serviceName: service, logger: logger, grpcPorts: grpcPorts}
}

// GetDispatcher return a cached dispatcher
func (d *Factory) GetDispatcher() *yarpc.Dispatcher {
d.Lock()
defer d.Unlock()

if d.dispatcher != nil {
return d.dispatcher
}

d.dispatcher = d.createInboundDispatcher()
return d.dispatcher
}

// createInboundDispatcher creates a dispatcher for inbound
func (d *Factory) createInboundDispatcher() *yarpc.Dispatcher {
// Setup dispatcher for onebox
var err error
func NewFactory(logger log.Logger, p Params) *Factory {
inbounds := yarpc.Inbounds{}

hostAddress := fmt.Sprintf("%v:%v", d.getListenIP(), d.config.Port)
d.tchannel, err = tchannel.NewChannelTransport(
tchannel.ServiceName(d.serviceName),
tchannel.ListenAddr(hostAddress))
// Create TChannel transport
// This is here only because ringpop extracts inbound from the dispatcher and expects tchannel.ChannelTransport,
// everywhere else we use regular tchannel.Transport.
ch, err := tchannel.NewChannelTransport(
tchannel.ServiceName(p.ServiceName),
tchannel.ListenAddr(p.TChannelAddress))
if err != nil {
logger.Fatal("Failed to create transport channel", tag.Error(err))
}
tchannel, err := tchannel.NewTransport(tchannel.ServiceName(p.ServiceName))
if err != nil {
d.logger.Fatal("Failed to create transport channel", tag.Error(err))
logger.Fatal("Failed to create tchannel transport", tag.Error(err))
}
inbounds = append(inbounds, d.tchannel.NewInbound())
d.logger.Info("Listening for TChannel requests", tag.Address(hostAddress))

inbounds = append(inbounds, ch.NewInbound())
logger.Info("Listening for TChannel requests", tag.Address(p.TChannelAddress))

// Create gRPC transport
var options []grpc.TransportOption
if d.config.GRPCMaxMsgSize > 0 {
options = append(options, grpc.ServerMaxRecvMsgSize(d.config.GRPCMaxMsgSize))
options = append(options, grpc.ClientMaxRecvMsgSize(d.config.GRPCMaxMsgSize))
if p.GRPCMaxMsgSize > 0 {
options = append(options, grpc.ServerMaxRecvMsgSize(p.GRPCMaxMsgSize))
options = append(options, grpc.ClientMaxRecvMsgSize(p.GRPCMaxMsgSize))
}
d.grpc = grpc.NewTransport(options...)
if d.config.GRPCPort > 0 {
grpcAddress := fmt.Sprintf("%v:%v", d.getListenIP(), d.config.GRPCPort)
listener, err := net.Listen("tcp", grpcAddress)
grpc := grpc.NewTransport(options...)
if len(p.GRPCAddress) > 0 {
listener, err := net.Listen("tcp", p.GRPCAddress)
if err != nil {
d.logger.Fatal("Failed to listen on GRPC port", tag.Error(err))
logger.Fatal("Failed to listen on GRPC port", tag.Error(err))
}

inbounds = append(inbounds, d.grpc.NewInbound(listener))
d.logger.Info("Listening for GRPC requests", tag.Address(grpcAddress))
inbounds = append(inbounds, grpc.NewInbound(listener))
logger.Info("Listening for GRPC requests", tag.Address(p.GRPCAddress))
}

return yarpc.NewDispatcher(yarpc.Config{
Name: d.serviceName,
Inbounds: inbounds,
// Create outbounds
outbounds := yarpc.Outbounds{}
if p.OutboundsBuilder != nil {
outbounds, err = p.OutboundsBuilder.Build(grpc, tchannel)
if err != nil {
logger.Fatal("Failed to create outbounds", tag.Error(err))
}
}

dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: p.ServiceName,
Inbounds: inbounds,
Outbounds: outbounds,
InboundMiddleware: p.InboundMiddleware,
OutboundMiddleware: p.OutboundMiddleware,
})

return &Factory{
logger: logger,
hostAddressMapper: p.HostAddressMapper,
tchannel: tchannel,
grpc: grpc,
dispatcher: dispatcher,
}
}

// GetDispatcher return a cached dispatcher
func (d *Factory) GetDispatcher() *yarpc.Dispatcher {
return d.dispatcher
}

// CreateDispatcherForOutbound creates a dispatcher for outbound connection
Expand All @@ -126,7 +130,7 @@ func (d *Factory) CreateGRPCDispatcherForOutbound(

// ReplaceGRPCPort replaces port in the address to grpc for a given service
func (d *Factory) ReplaceGRPCPort(serviceName, hostAddress string) (string, error) {
return d.grpcPorts.GetGRPCAddress(serviceName, hostAddress)
return d.hostAddressMapper.GetGRPCAddress(serviceName, hostAddress)
}

func (d *Factory) createOutboundDispatcher(
Expand All @@ -150,26 +154,3 @@ func (d *Factory) createOutboundDispatcher(
}
return dispatcher, nil
}

func (d *Factory) getListenIP() net.IP {
if d.config.BindOnLocalHost && len(d.config.BindOnIP) > 0 {
d.logger.Fatal("ListenIP failed, bindOnLocalHost and bindOnIP are mutually exclusive")
}

if d.config.BindOnLocalHost {
return net.IPv4(127, 0, 0, 1)
}

if len(d.config.BindOnIP) > 0 {
ip := net.ParseIP(d.config.BindOnIP)
if ip != nil && ip.To4() != nil {
return ip.To4()
}
d.logger.Fatal("ListenIP failed, unable to parse bindOnIP value or it is not an IPv4 address", tag.Address(d.config.BindOnIP))
}
ip, err := ListenIP()
if err != nil {
d.logger.Fatal("ListenIP failed", tag.Error(err))
}
return ip
}
8 changes: 7 additions & 1 deletion common/rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ import (
"github.com/uber/cadence/common/service"
)

type GRPCPorts map[string]int
type (
HostAddressMapper interface {
GetGRPCAddress(service, hostAddress string) (string, error)
}

GRPCPorts map[string]int
)

func NewGRPCPorts(c *config.Config) GRPCPorts {
grpcPorts := map[string]int{}
Expand Down
32 changes: 32 additions & 0 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpc

import (
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
)

// OutboundsBuilder allows defining outbounds for the dispatcher
type OutboundsBuilder interface {
Build(*grpc.Transport, *tchannel.Transport) (yarpc.Outbounds, error)
}
Loading

0 comments on commit 580c448

Please sign in to comment.