Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use named port to select transport for outbound calls #4749

Merged
merged 16 commits into from
Mar 3, 2022
Prev Previous commit
Next Next commit
implement lookup by address for simple resolver
  • Loading branch information
mantas-sidlauskas committed Mar 3, 2022
commit 18dc6eba1533da0deefeb44ed9506acee8c833a6
4 changes: 2 additions & 2 deletions client/history/peerResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (pr PeerResolver) FromShardID(shardID int) (string, error) {
if err != nil {
return "", err
}
return host.GetNamedAddress(pr.namedPort), nil
return host.GetNamedAddress(pr.namedPort)
}

// FromHostAddress resolves the final history peer responsible for the given host address.
Expand All @@ -80,5 +80,5 @@ func (pr PeerResolver) FromHostAddress(hostAddress string) (string, error) {
if err != nil {
return "", err
}
return host.GetNamedAddress(pr.namedPort), nil
return host.GetNamedAddress(pr.namedPort)
}
4 changes: 3 additions & 1 deletion client/matching/peerResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,7 @@ func (pr PeerResolver) FromHostAddress(hostAddress string) (string, error) {
if err != nil {
return "", err
}
return host.GetNamedAddress(pr.namedPort), nil

return host.GetNamedAddress(pr.namedPort)

}
8 changes: 4 additions & 4 deletions common/membership/hostinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ func (hi HostInfo) GetAddress() string {
}

// GetNamedAddress returns the ip:port address
func (hi HostInfo) GetNamedAddress(port string) string {
func (hi HostInfo) GetNamedAddress(port string) (string, error) {
if port, set := hi.portMap[port]; set {

return fmt.Sprintf("%s:%d", hi.ip, port)
return fmt.Sprintf("%s:%d", hi.ip, port), nil
}
return ""

return "", fmt.Errorf("port %q is not set for %+v", port, hi)
}

// Belongs tells if ip:port is assigned to this member
Expand Down
2 changes: 1 addition & 1 deletion common/rpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Factory struct {
// NewFactory builds a new rpc.Factory
func NewFactory(logger log.Logger, p Params) *Factory {
inbounds := yarpc.Inbounds{}

logger = logger.WithTags(tag.Service(p.ServiceName))
// Create TChannel transport
// This is here only because ringpop expects tchannel.ChannelTransport,
// everywhere else we use regular tchannel.Transport.
Expand Down
10 changes: 5 additions & 5 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func (s *ClientIntegrationSuite) SetupSuite() {
s.worker = worker.New(s.wfService, s.domainName, s.taskList, worker.Options{})
if err := s.worker.Start(); err != nil {
s.Logger.Fatal("Error when start worker", tag.Error(err))
} else {
s.Logger.Info("Worker started")
}
}

Expand All @@ -99,12 +101,10 @@ func (s *ClientIntegrationSuite) TearDownSuite() {

func (s *ClientIntegrationSuite) buildServiceClient() (workflowserviceclient.Interface, error) {
cadenceClientName := "cadence-client"
cadenceFrontendService := service.Frontend
hostPort := "127.0.0.1:7114"
hostPort := "127.0.0.1:7104"
if TestFlags.FrontendAddr != "" {
hostPort = TestFlags.FrontendAddr
}

ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(cadenceClientName))
if err != nil {
s.Logger.Fatal("Failed to create transport channel", tag.Error(err))
Expand All @@ -113,7 +113,7 @@ func (s *ClientIntegrationSuite) buildServiceClient() (workflowserviceclient.Int
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: cadenceClientName,
Outbounds: yarpc.Outbounds{
cadenceFrontendService: {Unary: ch.NewSingleOutbound(hostPort)},
service.Frontend: {Unary: ch.NewSingleOutbound(hostPort)},
},
})
if dispatcher == nil {
Expand All @@ -123,7 +123,7 @@ func (s *ClientIntegrationSuite) buildServiceClient() (workflowserviceclient.Int
s.Logger.Fatal("Failed to create outbound transport channel", tag.Error(err))
}

return workflowserviceclient.New(dispatcher.ClientConfig(cadenceFrontendService)), nil
return workflowserviceclient.New(dispatcher.ClientConfig(service.Frontend)), nil
}

func (s *ClientIntegrationSuite) SetupTest() {
Expand Down
2 changes: 1 addition & 1 deletion host/integrationbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type (
Logger log.Logger
domainName string
testRawHistoryDomainName string
foreignDomainName string
archivalDomainName string
foreignDomainName string
defaultTestCluster testcluster.PersistenceTestCluster
visibilityTestCluster testcluster.PersistenceTestCluster
}
Expand Down
9 changes: 9 additions & 0 deletions host/membership_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,14 @@ func (s *simpleResolver) Members(service string) ([]membership.HostInfo, error)
}

func (s *simpleResolver) LookupByAddress(service string, address string) (membership.HostInfo, error) {
resolver, ok := s.resolvers[service]
if !ok {
return membership.HostInfo{}, fmt.Errorf("cannot lookup host for service %q", service)
}
for _, m := range resolver.Members() {
if belongs, err := m.Belongs(address); err == nil && belongs {
return m, nil
}
}
return membership.HostInfo{}, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, shouldn't his return an error? Loop is finished but host not found yet.

}
24 changes: 20 additions & 4 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,17 +798,33 @@ func newPublicClient(dispatcher *yarpc.Dispatcher) cwsc.Interface {
}

func (c *cadenceImpl) newRPCFactory(serviceName string, host membership.HostInfo) common.RPCFactory {
tchannelAddres, err := host.GetNamedAddress(membership.PortTchannel)
if err != nil {
c.logger.Fatal("couldn't get tchannel port")
}

grpcAddress, err := host.GetNamedAddress(membership.PortGRPC)
if err != nil {
c.logger.Fatal("couldn't get grpc port")
}

frontendGrpcAddress, err := c.FrontendHost().GetNamedAddress(membership.PortGRPC)
if err != nil {
c.logger.Fatal("couldn't get frontend grpc port")
}

return rpc.NewFactory(c.logger, rpc.Params{
ServiceName: serviceName,
TChannelAddress: host.GetNamedAddress(membership.PortTchannel),
GRPCAddress: host.GetNamedAddress(membership.PortGRPC),
TChannelAddress: tchannelAddres,
GRPCAddress: grpcAddress,
InboundMiddleware: yarpc.InboundMiddleware{
Unary: &versionMiddleware{},
},

// For integration tests to generate client out of the same outbound.
OutboundsBuilder: rpc.CombineOutbounds(
&singleGRPCOutbound{testOutboundName(serviceName), serviceName, host.GetNamedAddress(membership.PortGRPC)},
&singleGRPCOutbound{rpc.OutboundPublicClient, service.Frontend, c.FrontendHost().GetNamedAddress(membership.PortGRPC)},
&singleGRPCOutbound{testOutboundName(serviceName), serviceName, grpcAddress},
&singleGRPCOutbound{rpc.OutboundPublicClient, service.Frontend, frontendGrpcAddress},
rpc.NewCrossDCOutbounds(c.clusterMetadata.GetAllClusterInfo(), rpc.NewDNSPeerChooserFactory(0, c.logger)),
rpc.NewDirectOutbound(service.History, true, nil),
rpc.NewDirectOutbound(service.Matching, true, nil),
Expand Down
7 changes: 5 additions & 2 deletions service/history/shard/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,12 @@ func CreateShardOwnershipLostError(
currentHost membership.HostInfo,
ownerHost membership.HostInfo,
) *types.ShardOwnershipLostError {

address, err := ownerHost.GetNamedAddress(membership.PortTchannel)
if err != nil {
address = ownerHost.Identity()
}
return &types.ShardOwnershipLostError{
Message: fmt.Sprintf("Shard is not owned by host: %v", currentHost.Identity()),
Owner: ownerHost.GetNamedAddress(membership.PortTchannel),
Owner: address,
}
}