Skip to content

Commit 928d0ca

Browse files
committed
Add support for Custom LB Policies
1 parent e853dbf commit 928d0ca

26 files changed

+869
-676
lines changed

attributes/attributes.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
// later release.
2626
package attributes
2727

28+
import "strings"
29+
2830
// Attributes is an immutable struct for storing and retrieving generic
2931
// key/value pairs. Keys must be hashable, and users should define their own
3032
// types for keys. Values should not be modified after they are added to an
@@ -99,3 +101,22 @@ func (a *Attributes) Equal(o *Attributes) bool {
99101
}
100102
return true
101103
}
104+
105+
// String prints the attribute map. If any key or values throughout the map
106+
// implement fmt.Stringer, it calls that method and appends.
107+
func (a *Attributes) String() string {
108+
var sb strings.Builder
109+
sb.WriteString("{")
110+
for k, v := range a.m {
111+
if str, ok := k.(interface{ String() string }); ok {
112+
sb.WriteString(str.String())
113+
sb.WriteString(", ")
114+
}
115+
if str, ok := v.(interface{ String() string }); ok {
116+
sb.WriteString(str.String())
117+
sb.WriteString(", ")
118+
}
119+
}
120+
sb.WriteString("}")
121+
return sb.String()
122+
}

balancer/weightedroundrobin/weightedroundrobin.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package weightedroundrobin
2121

2222
import (
23+
"fmt"
24+
2325
"google.golang.org/grpc/resolver"
2426
)
2527

@@ -66,3 +68,7 @@ func GetAddrInfo(addr resolver.Address) AddrInfo {
6668
ai, _ := v.(AddrInfo)
6769
return ai
6870
}
71+
72+
func (a AddrInfo) String() string {
73+
return fmt.Sprintf("Weight: %d", a.Weight)
74+
}

internal/testutils/xds/e2e/clientresources.go

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,17 @@ type EndpointOptions struct {
536536
// Ports is a set of ports on "localhost" where the endpoints corresponding
537537
// to this resource reside.
538538
Ports []uint32
539+
540+
// PortsInLocalities represent ports in different localities. The first
541+
// dimension represents a locality, and the second represents the ports
542+
// within that locality.
543+
PortsInLocalities [][]uint32
544+
545+
// LocalityWeights are the weights of localities specified in the first
546+
// dimension of PortsInLocalities. Must be the same length as the first
547+
// dimension of PortsInLocalities.
548+
LocalityWeights []uint32
549+
539550
// DropPercents is a map from drop category to a drop percentage. If unset,
540551
// no drops are configured.
541552
DropPercents map[string]int
@@ -550,6 +561,62 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin
550561
})
551562
}
552563

564+
// EndpointResourceWithOptionsMultipleLocalities returns an xDS Endpoint
565+
// resource which specifies multiple localities, with the ports specified per
566+
// locality placed into each localities endpoints specification.
567+
func EndpointResourceWithOptionsMultipleLocalities(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
568+
var endpoints []*v3endpointpb.LocalityLbEndpoints
569+
for i, portsInLocality := range opts.PortsInLocalities {
570+
var lbEndpoints []*v3endpointpb.LbEndpoint
571+
for _, port := range portsInLocality {
572+
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
573+
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
574+
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
575+
SocketAddress: &v3corepb.SocketAddress{
576+
Protocol: v3corepb.SocketAddress_TCP,
577+
Address: opts.Host,
578+
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
579+
}},
580+
}},
581+
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
582+
})
583+
}
584+
585+
endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{
586+
Locality: &v3corepb.Locality{
587+
Region: fmt.Sprintf("region%d", i),
588+
Zone: fmt.Sprintf("zone%d", i),
589+
SubZone: fmt.Sprintf("subzone%d", i),
590+
},
591+
LbEndpoints: lbEndpoints,
592+
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: opts.LocalityWeights[i]},
593+
Priority: 0,
594+
})
595+
}
596+
597+
cla := &v3endpointpb.ClusterLoadAssignment{
598+
ClusterName: opts.ClusterName,
599+
Endpoints: endpoints,
600+
}
601+
602+
var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
603+
for category, val := range opts.DropPercents {
604+
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{
605+
Category: category,
606+
DropPercentage: &v3typepb.FractionalPercent{
607+
Numerator: uint32(val),
608+
Denominator: v3typepb.FractionalPercent_HUNDRED,
609+
},
610+
})
611+
}
612+
if len(drops) != 0 {
613+
cla.Policy = &v3endpointpb.ClusterLoadAssignment_Policy{
614+
DropOverloads: drops,
615+
}
616+
}
617+
return cla
618+
}
619+
553620
// EndpointResourceWithOptions returns an xds Endpoint resource configured with
554621
// the provided options.
555622
func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
@@ -564,18 +631,20 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
564631
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
565632
}},
566633
}},
634+
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
567635
})
568636
}
569637
cla := &v3endpointpb.ClusterLoadAssignment{
570638
ClusterName: opts.ClusterName,
571-
Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
572-
Locality: &v3corepb.Locality{SubZone: "subzone"},
573-
LbEndpoints: lbEndpoints,
574-
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
575-
Priority: 0,
576-
}},
639+
Endpoints: []*v3endpointpb.LocalityLbEndpoints{
640+
{
641+
Locality: &v3corepb.Locality{SubZone: "subzone"},
642+
LbEndpoints: lbEndpoints,
643+
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
644+
Priority: 0,
645+
},
646+
},
577647
}
578-
579648
var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload
580649
for category, val := range opts.DropPercents {
581650
drops = append(drops, &v3endpointpb.ClusterLoadAssignment_Policy_DropOverload{

resolver/resolver.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ package resolver
2222

2323
import (
2424
"context"
25+
"fmt"
2526
"net"
2627
"net/url"
2728
"strings"
2829

2930
"google.golang.org/grpc/attributes"
3031
"google.golang.org/grpc/credentials"
31-
"google.golang.org/grpc/internal/pretty"
3232
"google.golang.org/grpc/serviceconfig"
3333
)
3434

@@ -124,7 +124,7 @@ type Address struct {
124124
Attributes *attributes.Attributes
125125

126126
// BalancerAttributes contains arbitrary data about this address intended
127-
// for consumption by the LB policy. These attribes do not affect SubConn
127+
// for consumption by the LB policy. These attributes do not affect SubConn
128128
// creation, connection establishment, handshaking, etc.
129129
BalancerAttributes *attributes.Attributes
130130

@@ -151,7 +151,20 @@ func (a Address) Equal(o Address) bool {
151151

152152
// String returns JSON formatted string representation of the address.
153153
func (a Address) String() string {
154-
return pretty.ToJSON(a)
154+
var sb strings.Builder
155+
sb.WriteString(fmt.Sprintf("{Addr: %v, ", a.Addr))
156+
sb.WriteString(fmt.Sprintf("ServerName: %v, ", a.ServerName))
157+
var atrStr string
158+
if a.Attributes != nil {
159+
atrStr = a.Attributes.String()
160+
}
161+
sb.WriteString(fmt.Sprintf("Attributes: %v, ", atrStr))
162+
var balAtrStr string
163+
if a.BalancerAttributes != nil {
164+
balAtrStr = a.BalancerAttributes.String()
165+
}
166+
sb.WriteString(fmt.Sprintf("BalancerAttributes: %v}", balAtrStr))
167+
return sb.String()
155168
}
156169

157170
// BuildOptions includes additional information for the builder to create

0 commit comments

Comments
 (0)